You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/04 10:02:29 UTC
[23/50] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-2.1.2
Merge remote-tracking branch 'remotes/origin/master' into ignite-2.1.2
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0f59631
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0f59631
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0f59631
Branch: refs/heads/master
Commit: f0f59631ec05a281af671f6cc246ca3ef443e083
Parents: 4adfca9 8445b31
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 28 09:06:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 28 09:06:23 2017 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 153 ++++----
.../GridDhtPartitionsExchangeFuture.java | 24 +-
.../DynamicIndexAbstractBasicSelfTest.java | 81 ++++-
...exingComplexClientAtomicPartitionedTest.java | 33 ++
...dexingComplexClientAtomicReplicatedTest.java | 33 ++
...mplexClientTransactionalPartitionedTest.java | 33 ++
...omplexClientTransactionalReplicatedTest.java | 33 ++
...exingComplexServerAtomicPartitionedTest.java | 33 ++
...dexingComplexServerAtomicReplicatedTest.java | 33 ++
...mplexServerTransactionalPartitionedTest.java | 33 ++
...omplexServerTransactionalReplicatedTest.java | 33 ++
.../index/H2DynamicIndexingComplexTest.java | 356 +++++++++++++++++++
.../cache/index/H2DynamicTableSelfTest.java | 182 ++++++++--
.../IgniteCacheQuerySelfTestSuite.java | 30 +-
15 files changed, 961 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ce19c6b,bb6aab3..a6f1831
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -59,7 -60,9 +59,8 @@@ import org.apache.ignite.internal.util.
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+ import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@@ -299,91 -383,93 +300,93 @@@ public class GridDhtPartitionTopologyIm
* @param updateSeq Update sequence.
*/
private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
- ClusterNode loc = ctx.localNode();
+ List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ if (grp.affinityNode()) {
+ ClusterNode loc = ctx.localNode();
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- assert topVer.equals(exchFut.topologyVersion()) :
- "Invalid topology [topVer=" + topVer +
- ", grp=" + grp.cacheOrGroupName() +
- ", futVer=" + exchFut.topologyVersion() +
- ", fut=" + exchFut + ']';
- assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
- "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
- ", grp=" + grp.cacheOrGroupName() +
- ", futVer=" + exchFut.topologyVersion() +
- ", fut=" + exchFut + ']';
+ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
- List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion());
+ assert topVer.equals(exchFut.topologyVersion()) :
+ "Invalid topology [topVer=" + topVer +
+ ", grp=" + grp.cacheOrGroupName() +
+ ", futVer=" + exchFut.topologyVersion() +
+ ", fut=" + exchFut + ']';
+ assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+ "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+ ", grp=" + grp.cacheOrGroupName() +
+ ", futVer=" + exchFut.topologyVersion() +
+ ", fut=" + exchFut + ']';
- int num = grp.affinity().partitions();
+ int num = grp.affinity().partitions();
- if (grp.rebalanceEnabled()) {
- boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+ if (grp.rebalanceEnabled()) {
+ boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
- boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
+ boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined());
- if (first) {
- assert exchId.isJoined() || added;
+ if (first) {
+ assert exchId.isJoined() || added;
- for (int p = 0; p < num; p++) {
- if (localNode(p, aff)) {
- GridDhtLocalPartition locPart = createPartition(p);
+ for (int p = 0; p < num; p++) {
+ if (localNode(p, aff)) {
+ GridDhtLocalPartition locPart = createPartition(p);
- boolean owned = locPart.own();
+ boolean owned = locPart.own();
- assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() +
- ", part=" + locPart + ']';
+ assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() +
+ ", part=" + locPart + ']';
- if (log.isDebugEnabled())
- log.debug("Owned partition for oldest node: " + locPart);
+ if (log.isDebugEnabled())
+ log.debug("Owned partition for oldest node: " + locPart);
- updateSeq = updateLocal(p, locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
+ }
}
}
+ else
+ createPartitions(aff, updateSeq);
}
- else
- createPartitions(aff, updateSeq);
- }
- else {
- // If preloader is disabled, then we simply clear out
- // the partitions this node is not responsible for.
- for (int p = 0; p < num; p++) {
- GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false);
+ else {
+ // If preloader is disabled, then we simply clear out
+ // the partitions this node is not responsible for.
+ for (int p = 0; p < num; p++) {
- GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
++ GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false);
- boolean belongs = localNode(p, aff);
+ boolean belongs = localNode(p, aff);
- if (locPart != null) {
- if (!belongs) {
- GridDhtPartitionState state = locPart.state();
+ if (locPart != null) {
+ if (!belongs) {
+ GridDhtPartitionState state = locPart.state();
- if (state.active()) {
- locPart.rent(false);
+ if (state.active()) {
+ locPart.rent(false);
- updateSeq = updateLocal(p, locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
- if (log.isDebugEnabled())
- log.debug("Evicting partition with rebalancing disabled " +
- "(it does not belong to affinity): " + locPart);
+ if (log.isDebugEnabled())
+ log.debug("Evicting partition with rebalancing disabled " +
+ "(it does not belong to affinity): " + locPart);
+ }
}
+ else
+ locPart.own();
}
- else
- locPart.own();
- }
- else if (belongs) {
- locPart = createPartition(p);
+ else if (belongs) {
+ locPart = createPartition(p);
- locPart.own();
+ locPart.own();
- updateLocal(p, locPart.state(), updateSeq);
+ updateLocal(p, locPart.state(), updateSeq);
+ }
}
}
- }
- if (node2part != null && node2part.valid())
- checkEvictions(updateSeq, aff);
+ if (node2part != null && node2part.valid())
+ checkEvictions(updateSeq, aff);
+ }
updateRebalanceVersion(aff);
}
@@@ -430,81 -522,94 +436,84 @@@
ctx.database().checkpointReadLock();
- synchronized (ctx.exchange().interruptLock()) {
- if (Thread.currentThread().isInterrupted())
- throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
+ try {
+ synchronized (ctx.exchange().interruptLock()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
- try {
U.writeLock(lock);
- }
- catch (IgniteInterruptedCheckedException e) {
- ctx.database().checkpointReadUnlock();
-
- throw e;
- }
- try {
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ try {
+
- if (stopping)
- return;
+
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
- topVer + ", exchId=" + exchId + ']';
+ if (stopping)
+ return;
- if (exchId.isLeft())
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
-
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
++ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+ topVer + ", exchId=" + exchId + ']';
+
- if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
- removeNode(exchId.nodeId());
++ if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
+ removeNode(exchId.nodeId());
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- if (log.isDebugEnabled())
- log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
- long updateSeq = this.updateSeq.incrementAndGet();
+ long updateSeq = this.updateSeq.incrementAndGet();
- cntrMap.clear();
+ cntrMap.clear();
- // If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) {
- if (node2part == null) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
- boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
-
- // If this is the oldest node.
++ boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());// If this is the oldest node.
+ if (oldest != null && (loc.equals(oldest) || grpStarted)) {
+ if (node2part == null) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
- if (log.isDebugEnabled())
- log.debug("Created brand new full topology map on oldest node [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
- }
- else if (!node2part.valid()) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ if (log.isDebugEnabled())
+ log.debug("Created brand new full topology map on oldest node [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
+ else if (!node2part.valid()) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
- node2part + ']');
- }
- else if (!node2part.nodeId().equals(loc.id())) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ if (log.isDebugEnabled())
+ log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+ node2part + ']');
+ }
+ else if (!node2part.nodeId().equals(loc.id())) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
}
- }
- if (affReady)
- initPartitions0(exchFut, updateSeq);
- else {
- List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
+ if (grpStarted ||
+ exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
- exchFut.serverNodeDiscoveryEvent()) {
- if (affReady)
- initPartitions0(exchFut, updateSeq);
- else {
- List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
++ exchFut.serverNodeDiscoveryEvent()) {if (affReady)
++ initPartitions0(exchFut, updateSeq);
++ else {
++ List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
createPartitions(aff, updateSeq);
}
+ }
- consistencyCheck();
-
- if (log.isDebugEnabled())
- log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
- fullMapString() + ']');
- }
- finally {
- lock.writeLock().unlock();
+ consistencyCheck();
- ctx.database().checkpointReadUnlock();
+ if (log.isDebugEnabled())
+ log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+ fullMapString() + ']');
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
}
}
-
- // Wait for evictions.
- waitForRent();
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
----------------------------------------------------------------------