You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 14:05:14 UTC

[14/14] ignite git commit: Merge remote-tracking branch 'remotes/community/ignite-2.1.2' into ignite-gg-12389

Merge remote-tracking branch 'remotes/community/ignite-2.1.2' into ignite-gg-12389

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ff1a987
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ff1a987
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ff1a987

Branch: refs/heads/ignite-gg-12389
Commit: 0ff1a987e15cc4cb8a59ab84cad70be4d599a870
Parents: 96718eb c4ddda3
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 17:04:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 17:04:37 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  11 +-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../GridCachePartitionExchangeManager.java      |  25 +--
 .../processors/cache/GridCacheProcessor.java    |   9 +-
 .../dht/GridClientPartitionTopology.java        |   5 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   7 +-
 .../dht/preloader/GridDhtPartitionMap.java      |  13 +-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../GridCacheAtomicSequenceImpl.java            |  97 ++++++-----
 .../internal/util/GridPartitionStateMap.java    |  60 ++++++-
 .../CacheLateAffinityAssignmentTest.java        |  81 +++++++++-
 .../GridCachePartitionedNodeRestartTest.java    |   5 -
 .../testsuites/IgniteUtilSelfTestSuite.java     |   3 +
 .../ignite/util/GridPartitionMapSelfTest.java   | 162 +++++++++++++++++++
 .../helpers/jade/form/form-field-checkbox.pug   |   2 +-
 .../frontend/app/primitives/tooltip/index.pug   |   5 +-
 16 files changed, 405 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b227f6a,2b5a4ff..4f15519
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -343,105 -309,35 +343,105 @@@ public class GridCachePartitionExchange
          }
      };
  
 -    /** {@inheritDoc} */
 -    @Override protected void start0() throws IgniteCheckedException {
 -        super.start0();
 +    private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) {
 +        log.info("Ignore event: " + evt);
  
 -        exchWorker = new ExchangeWorker();
 +        // TODO GG-12389: finish operations with error.
 +   }
  
 -        cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
 -            EVT_DISCOVERY_CUSTOM_EVT);
 +    /**
 +     * @param evt Event.
 +     * @param cache Discovery data cache.
 +     */
 +    private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
 +        ClusterNode loc = cctx.localNode();
  
 -        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
 -            new MessageHandler<GridDhtPartitionsSingleMessage>() {
 -                @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
 -                    processSinglePartitionUpdate(node, msg);
 +        assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
 +            evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
 +
 +        final ClusterNode n = evt.eventNode();
 +
 +        GridDhtPartitionExchangeId exchId = null;
 +        GridDhtPartitionsExchangeFuture exchFut = null;
 +
 +        if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
 +            assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
 +                "Node joined with smaller-than-local " +
 +                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 +
 +            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
 +            exchFut = exchangeFuture(exchId, evt, cache,null, null);
 +        }
 +        else {
 +            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
 +
 +            if (customMsg instanceof ChangeGlobalStateMessage) {
 +                ChangeGlobalStateMessage stateChangeMsg = (ChangeGlobalStateMessage)customMsg;
 +
 +                ExchangeActions exchActions = stateChangeMsg.exchangeActions();
 +
 +                if (exchActions != null) {
 +                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
 +                    exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
                  }
 -            });
 +            }
 +            else if (customMsg instanceof DynamicCacheChangeBatch) {
 +                DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
  
 -        cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class,
 -            new MessageHandler<GridDhtPartitionsFullMessage>() {
 -                @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
 -                    processFullPartitionUpdate(node, msg);
 +                ExchangeActions exchActions = batch.exchangeActions();
 +
 +                if (exchActions != null) {
 +                    exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
 +                    exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
                  }
 -            });
 +            }
 +            else if (customMsg instanceof CacheAffinityChangeMessage) {
 +                CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
  
 -        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class,
 -            new MessageHandler<GridDhtPartitionsSingleRequest>() {
 -                @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
 -                    processSinglePartitionRequest(node, msg);
 +                if (msg.exchangeId() == null) {
 +                    if (msg.exchangeNeeded()) {
 +                        exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
 +                        exchFut = exchangeFuture(exchId, evt, cache, null, msg);
 +                    }
                  }
-                 else
 -            });
++                else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
 +                    exchangeFuture(msg.exchangeId(), null, null, null, null)
 +                        .onAffinityChangeMessage(evt.eventNode(), msg);
 +            }
 +            else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
 +                && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
 +                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
 +                exchFut = exchangeFuture(exchId, evt, null, null, null);
 +            }
 +            else {
 +                // Process event as custom discovery task if needed.
 +                CachePartitionExchangeWorkerTask task =
 +                    cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
 +
 +                if (task != null)
 +                    exchWorker.addCustomTask(task);
 +            }
 +        }
 +
 +        if (exchId != null) {
 +            if (log.isDebugEnabled())
 +                log.debug("Discovery event (will start exchange): " + exchId);
 +
 +            // Event callback - without this callback future will never complete.
 +            exchFut.onEvent(exchId, evt, cache);
 +
 +            // Start exchange process.
 +            addFuture(exchFut);
 +        }
 +        else {
 +            if (log.isDebugEnabled())
 +                log.debug("Do not start exchange for discovery event: " + evt);
 +        }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ff1a987/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------