You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/18 16:50:24 UTC

incubator-ignite git commit: # ignite-23 avoid exchange for client node join/leave

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-23 [created] a2a6f31fe


# ignite-23 avoid exchange for client node join/leave


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a2a6f31f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a2a6f31f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a2a6f31f

Branch: refs/heads/ignite-23
Commit: a2a6f31fe4207dbb3cd2146c3f22a5c99d83fc5b
Parents: 5c8591c
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 18 17:50:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 18 17:50:06 2015 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  27 +++
 .../cache/GridCacheAffinityManager.java         |  12 ++
 .../GridDhtPartitionsExchangeFuture.java        | 100 +++++++---
 .../IgniteCacheClientNodeExchangeTest.java      | 184 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 5 files changed, 300 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index eccd9f9..18ac65a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -103,6 +103,32 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param node Node.
+     * @param topVer Topology version.
+     */
+    public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+        GridAffinityAssignment assignment = head.get();
+
+        assert assignment.primaryPartitions(node.id()).isEmpty() : node;
+        assert assignment.backupPartitions(node.id()).isEmpty() : node;
+
+        GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment());
+
+        affCache.put(topVer, assignmentCpy);
+        head.set(assignmentCpy);
+
+        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+            if (entry.getKey().compareTo(topVer) <= 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Completing topology ready future (use previous affinity) " +
+                        "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+
+                entry.getValue().onDone(topVer);
+            }
+        }
+    }
+
+    /**
      * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes
      * and brought to local node on partition map exchange.
      *
@@ -422,6 +448,7 @@ public class GridAffinityAssignmentCache {
 
         /**
          *
+         * @param reqTopVer Required topology version.
          */
         private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) {
             this.reqTopVer = reqTopVer;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index fe7efd5..6541e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -140,6 +140,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      *
      * @param topVer Topology version to calculate affinity for.
      * @param discoEvt Discovery event that causes this topology change.
+     * @return Affinity assignments.
      */
     public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
         assert !cctx.isLocal();
@@ -148,6 +149,17 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
+     * @param node Node.
+     * @param topVer Topology version.
+     */
+    public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) {
+        assert !cctx.isLocal();
+
+        aff.clientNodeTopologyChange(node, topVer);
+    }
+
+    /**
      * @return Partition count.
      */
     public int partitions() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 599e391..f4dcf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,6 +44,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
@@ -146,6 +147,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Dynamic cache change requests. */
     private Collection<DynamicCacheChangeRequest> reqs;
 
+    /** Cache validation results. */
     private volatile Map<Integer, Boolean> cacheValidRes;
 
     /**
@@ -200,6 +202,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
+     * @param reqs Cache change requests.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
@@ -322,7 +325,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * Rechecks topology.
+     * @param cacheCtx Cache context.
+     * @throws IgniteCheckedException If failed.
      */
     private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
         if (stopping(cacheCtx.cacheId()))
@@ -471,6 +475,62 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
+                if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave.
+                    assert discoEvt != null;
+
+                    int type = discoEvt.type();
+
+                    assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+
+                    ClusterNode node = discoEvt.eventNode();
+
+                    if (!node.isLocal()) {
+                        boolean affNode = false;
+
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                            if (cacheCtx.isLocal())
+                                continue;
+
+                            if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) {
+                                affNode = true;
+
+                                break;
+                            }
+                        }
+
+                        if (!affNode) {
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                if (cacheCtx.isLocal())
+                                    continue;
+
+                                cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+
+                                GridDhtPartitionTopology top = cacheCtx.topology();
+
+                                GridDhtPartitionMap parts = top.partitions(node.id());
+
+                                assert parts == null || parts.size() == 0 : parts;
+
+                                top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+                            }
+
+                            if (!exchId.isLeft()) {
+                                rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node));
+
+                                rmtIds = F.asList(node.id());
+                            }
+
+                            ready.set(true);
+
+                            initFut.onDone(true);
+
+                            onDone(exchId.topologyVersion());
+
+                            return;
+                        }
+                    }
+                }
+
                 startCaches();
 
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -484,8 +544,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 List<String> cachesWithoutNodes = null;
 
-                for (String name : cctx.cache().cacheNames()) {
-                    if (exchId.isLeft()) {
+                if (exchId.isLeft()) {
+                    for (String name : cctx.cache().cacheNames()) {
                         if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
                             if (cachesWithoutNodes == null)
                                 cachesWithoutNodes = new ArrayList<>();
@@ -521,7 +581,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 if (cachesWithoutNodes != null) {
-                    StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: ");
+                    StringBuilder sb =
+                        new StringBuilder("All server nodes for the following caches have left the cluster: ");
 
                     for (int i = 0; i < cachesWithoutNodes.size(); i++) {
                         String cache = cachesWithoutNodes.get(i);
@@ -666,36 +727,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Initialized future: " + this);
 
-            if (canSkipExchange())
-                onDone(exchId.topologyVersion());
+            // If this node is not oldest.
+            if (!oldestNode.get().id().equals(cctx.localNodeId()))
+                sendPartitions();
             else {
-                // If this node is not oldest.
-                if (!oldestNode.get().id().equals(cctx.localNodeId()))
-                    sendPartitions();
-                else {
-                    boolean allReceived = allReceived();
-
-                    if (allReceived && replied.compareAndSet(false, true)) {
-                        if (spreadPartitions())
-                            onDone(exchId.topologyVersion());
-                    }
-                }
+                boolean allReceived = allReceived();
 
-                scheduleRecheck();
+                if (allReceived && replied.compareAndSet(false, true)) {
+                    if (spreadPartitions())
+                        onDone(exchId.topologyVersion());
+                }
             }
+
+            scheduleRecheck();
         }
         else
             assert false : "Skipped init future: " + this;
     }
 
     /**
-     * @return {@code True} if no distributed exchange is needed.
-     */
-    private boolean canSkipExchange() {
-        return false; // TODO ignite-23;
-    }
-
-    /**
      *
      */
     private void dumpPendingObjects() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
new file mode 100644
index 0000000..66db3c6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.eclipse.jetty.util.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoPartitionExchangeForClient() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+        Ignite ignite1 = startGrid(1);
+
+        TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
+
+        assertEquals(0, spi0.partitionsSingleMessages().size());
+        assertEquals(1, spi0.partitionsFullMessages().size());
+
+        assertEquals(1, spi1.partitionsSingleMessages().size());
+        assertEquals(0, spi1.partitionsFullMessages().size());
+
+        spi0.reset();
+        spi1.reset();
+
+        client = true;
+
+        for (int i = 0; i < 3; i++) {
+            log.info("Start client node: " + i);
+
+            Ignite ignite2 = startGrid(2);
+
+            TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+            assertEquals(0, spi0.partitionsSingleMessages().size());
+            assertEquals(1, spi0.partitionsFullMessages().size());
+
+            assertEquals(0, spi1.partitionsSingleMessages().size());
+            assertEquals(0, spi1.partitionsFullMessages().size());
+
+            assertEquals(1, spi2.partitionsSingleMessages().size());
+            assertEquals(0, spi2.partitionsFullMessages().size());
+
+            spi0.reset();
+            spi1.reset();
+            spi2.reset();
+
+            log.info("Stop client node.");
+
+            ignite2.close();
+
+            assertEquals(0, spi0.partitionsSingleMessages().size());
+            assertEquals(0, spi0.partitionsFullMessages().size());
+
+            assertEquals(0, spi1.partitionsSingleMessages().size());
+            assertEquals(0, spi1.partitionsFullMessages().size());
+        }
+    }
+
+    /**
+     * Test communication SPI.
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>();
+
+        /** */
+        private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>();
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) {
+            super.sendMessage(node, msg);
+
+            Object msg0 = ((GridIoMessage)msg).message();
+
+            if (msg0 instanceof GridDhtPartitionsSingleMessage) {
+                if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
+                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+                    partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0);
+                }
+            }
+            else if (msg0 instanceof GridDhtPartitionsFullMessage) {
+                if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
+                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+                    partFullMsgs.add((GridDhtPartitionsFullMessage) msg0);
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void reset() {
+            partSingleMsgs.clear();
+            partFullMsgs.clear();
+        }
+
+        /**
+         * @return Sent partitions single messages.
+         */
+        Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() {
+            return partSingleMsgs;
+        }
+
+        /**
+         * @return Sent partitions full messages.
+         */
+        Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() {
+            return partFullMsgs;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 5738778..6031dcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -138,6 +138,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(GridCacheNearPrimarySyncSelfTest.class));
         suite.addTest(new TestSuite(GridCacheColocatedPrimarySyncSelfTest.class));
 
+        suite.addTest(new TestSuite(IgniteCacheClientNodeExchangeTest.class));
+
         return suite;
     }
 }