You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/17 04:05:01 UTC

[55/55] [abbrv] ignite git commit: IGNITE-1171 - Attempt to fix discovery.

IGNITE-1171 - Attempt to fix discovery.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6aa0ee16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6aa0ee16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6aa0ee16

Branch: refs/heads/ignite-1171
Commit: 6aa0ee16c18086ba36f8f55735ce75c105a00cb3
Parents: c01f936
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Sep 16 19:03:31 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Sep 16 19:03:31 2015 -0700

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  6 ++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 31 ++++++++++++++++----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  5 +++-
 .../TcpDiscoveryNodeAddFinishedMessage.java     | 26 +++++++++++++++-
 .../distributed/CacheAffEarlySelfTest.java      |  3 ++
 5 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..d7ada1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1412,6 +1412,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         private static final long serialVersionUID = 0L;
 
         /** */
+        @GridToStringInclude
         private AffinityTopologyVersion topVer;
 
         /**
@@ -1432,5 +1433,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             return done;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AffinityReadyFuture.class, this, super.toString());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3e50b94..24b30c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3186,7 +3186,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (data != null)
                         spi.onExchange(node.id(), node.id(), data, U.gridClassLoader());
 
-                    msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+                    // Collect discovery data the old way for older version nodes.
+                    if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) > 0)
+                        msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
                 }
 
                 if (log.isDebugEnabled())
@@ -3253,9 +3255,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 // Notify outside of synchronized block.
-                if (dataMap != null) {
-                    for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                        spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+                // Notify on node added message only if joining node is an old node.
+                if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) > 0) {
+                    if (dataMap != null) {
+                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+                            spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+                    }
                 }
             }
 
@@ -3332,6 +3337,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+                if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) <= 0) {
+                    Map<Integer, byte[]> data = spi.collectExchangeData(node.id());
+
+                    msg.addDiscoveryData(locNodeId, data);
+                }
+
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3385,6 +3396,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                     mux.notifyAll();
                 }
 
+                // Notify outside of synchronized block.
+                if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) <= 0) {
+                    Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData();
+
+                    if (dataMap != null) {
+                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+                            spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+                    }
+                }
+
                 // Discovery manager must create local joined event before spiStart completes.
                 notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
             }
@@ -5293,4 +5314,4 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.sock = sock;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 80fcc46..237e6d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -222,6 +222,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Failure detection timeout feature minor version. */
     final static byte FAILURE_DETECTION_MINOR_VER = 4;
 
+    /** */
+    public static final IgniteProductVersion DISCOVERY_DATA_COLLECT_ON_FINISH = IgniteProductVersion.fromString("1.4.0");
+
     /** Failure detection timeout feature maintainance version. */
     final static byte FAILURE_DETECTION_MAINT_VER = 1;
 
@@ -2038,4 +2041,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             return S.toString(SocketTimeoutObject.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index c6a469f..fc2a166 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -46,6 +47,9 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     @GridToStringExclude
     private Map<String, Object> clientNodeAttrs;
 
+    /** Discovery data from old nodes. */
+    private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData;
+
     /**
      * Constructor.
      *
@@ -56,6 +60,8 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
         super(creatorNodeId);
 
         this.nodeId = nodeId;
+
+        oldNodesDiscoData = new LinkedHashMap<>();
     }
 
     /**
@@ -75,6 +81,24 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     }
 
     /**
+     * @return Discovery data from old nodes.
+     */
+    public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {
+        return oldNodesDiscoData;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param discoData Discovery data to add.
+     */
+    public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) {
+        // Old nodes disco data may be null if message
+        // makes more than 1 pass due to stopping of the nodes in topology.
+        if (oldNodesDiscoData != null)
+            oldNodesDiscoData.put(nodeId, discoData);
+    }
+
+    /**
      * @param clientDiscoData Discovery data for joined client.
      */
     public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
@@ -101,4 +125,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     @Override public String toString() {
         return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
index 7f0ca11..453b6f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteFutureTimeoutException;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -83,6 +84,8 @@ public class CacheAffEarlySelfTest extends GridCommonAbstractTest {
 
         cfg.setMarshaller(marsh);
 
+        cfg.setCheckpointSpi(new NoopCheckpointSpi());
+
         return cfg;
     }