You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 14:05:14 UTC
[14/14] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-2.1.2' into ignite-gg-12389
Merge remote-tracking branch 'remotes/community/ignite-2.1.2' into ignite-gg-12389
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ff1a987
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ff1a987
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ff1a987
Branch: refs/heads/ignite-gg-12389
Commit: 0ff1a987e15cc4cb8a59ab84cad70be4d599a870
Parents: 96718eb c4ddda3
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 17:04:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 17:04:37 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 11 +-
.../processors/cache/GridCacheAdapter.java | 4 +
.../GridCachePartitionExchangeManager.java | 25 +--
.../processors/cache/GridCacheProcessor.java | 9 +-
.../dht/GridClientPartitionTopology.java | 5 +-
.../dht/GridDhtPartitionTopologyImpl.java | 7 +-
.../dht/preloader/GridDhtPartitionMap.java | 13 +-
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../GridCacheAtomicSequenceImpl.java | 97 ++++++-----
.../internal/util/GridPartitionStateMap.java | 60 ++++++-
.../CacheLateAffinityAssignmentTest.java | 81 +++++++++-
.../GridCachePartitionedNodeRestartTest.java | 5 -
.../testsuites/IgniteUtilSelfTestSuite.java | 3 +
.../ignite/util/GridPartitionMapSelfTest.java | 162 +++++++++++++++++++
.../helpers/jade/form/form-field-checkbox.pug | 2 +-
.../frontend/app/primitives/tooltip/index.pug | 5 +-
16 files changed, 405 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b227f6a,2b5a4ff..4f15519
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -343,105 -309,35 +343,105 @@@ public class GridCachePartitionExchange
}
};
- /** {@inheritDoc} */
- @Override protected void start0() throws IgniteCheckedException {
- super.start0();
+ private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) {
+ log.info("Ignore event: " + evt);
- exchWorker = new ExchangeWorker();
+ // TODO GG-12389: finish operations with error.
+ }
- cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
- EVT_DISCOVERY_CUSTOM_EVT);
+ /**
+ * @param evt Event.
+ * @param cache Discovery data cache.
+ */
+ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
+ ClusterNode loc = cctx.localNode();
- cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
- new MessageHandler<GridDhtPartitionsSingleMessage>() {
- @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
- processSinglePartitionUpdate(node, msg);
+ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+ evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
+
+ final ClusterNode n = evt.eventNode();
+
+ GridDhtPartitionExchangeId exchId = null;
+ GridDhtPartitionsExchangeFuture exchFut = null;
+
+ if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+ assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
+ "Node joined with smaller-than-local " +
+ "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache,null, null);
+ }
+ else {
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
+
+ if (customMsg instanceof ChangeGlobalStateMessage) {
+ ChangeGlobalStateMessage stateChangeMsg = (ChangeGlobalStateMessage)customMsg;
+
+ ExchangeActions exchActions = stateChangeMsg.exchangeActions();
+
+ if (exchActions != null) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
- });
+ }
+ else if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
- cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class,
- new MessageHandler<GridDhtPartitionsFullMessage>() {
- @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
- processFullPartitionUpdate(node, msg);
+ ExchangeActions exchActions = batch.exchangeActions();
+
+ if (exchActions != null) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
- });
+ }
+ else if (customMsg instanceof CacheAffinityChangeMessage) {
+ CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
- cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class,
- new MessageHandler<GridDhtPartitionsSingleRequest>() {
- @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
- processSinglePartitionRequest(node, msg);
+ if (msg.exchangeId() == null) {
+ if (msg.exchangeNeeded()) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, null, msg);
+ }
}
- else
- });
++ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
+ exchangeFuture(msg.exchangeId(), null, null, null, null)
+ .onAffinityChangeMessage(evt.eventNode(), msg);
+ }
+ else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
+ && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, null, null, null);
+ }
+ else {
+ // Process event as custom discovery task if needed.
+ CachePartitionExchangeWorkerTask task =
+ cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+ if (task != null)
+ exchWorker.addCustomTask(task);
+ }
+ }
+
+ if (exchId != null) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery event (will start exchange): " + exchId);
+
+ // Event callback - without this callback future will never complete.
+ exchFut.onEvent(exchId, evt, cache);
+
+ // Start exchange process.
+ addFuture(exchFut);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Do not start exchange for discovery event: " + evt);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------