You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/17 04:05:01 UTC
[55/55] [abbrv] ignite git commit: IGNITE-1171 - Attempt to fix
discovery.
IGNITE-1171 - Attempt to fix discovery.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6aa0ee16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6aa0ee16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6aa0ee16
Branch: refs/heads/ignite-1171
Commit: 6aa0ee16c18086ba36f8f55735ce75c105a00cb3
Parents: c01f936
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Sep 16 19:03:31 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Sep 16 19:03:31 2015 -0700
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 6 ++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 ++++++++++++++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +++-
.../TcpDiscoveryNodeAddFinishedMessage.java | 26 +++++++++++++++-
.../distributed/CacheAffEarlySelfTest.java | 3 ++
5 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..d7ada1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1412,6 +1412,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private static final long serialVersionUID = 0L;
/** */
+ @GridToStringInclude
private AffinityTopologyVersion topVer;
/**
@@ -1432,5 +1433,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return done;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AffinityReadyFuture.class, this, super.toString());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3e50b94..24b30c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3186,7 +3186,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (data != null)
spi.onExchange(node.id(), node.id(), data, U.gridClassLoader());
- msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+ // Collect discovery data the old way for older version nodes.
+ if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) > 0)
+ msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
}
if (log.isDebugEnabled())
@@ -3253,9 +3255,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
// Notify outside of synchronized block.
- if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+ // Notify on node added message only if joining node is an old node.
+ if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) > 0) {
+ if (dataMap != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+ spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+ }
}
}
@@ -3332,6 +3337,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+ if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) <= 0) {
+ Map<Integer, byte[]> data = spi.collectExchangeData(node.id());
+
+ msg.addDiscoveryData(locNodeId, data);
+ }
+
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3385,6 +3396,16 @@ class ServerImpl extends TcpDiscoveryImpl {
mux.notifyAll();
}
+ // Notify outside of synchronized block.
+ if (TcpDiscoverySpi.DISCOVERY_DATA_COLLECT_ON_FINISH.compareTo(node.version()) <= 0) {
+ Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData();
+
+ if (dataMap != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+ spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+ }
+ }
+
// Discovery manager must create local joined event before spiStart completes.
notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
}
@@ -5293,4 +5314,4 @@ class ServerImpl extends TcpDiscoveryImpl {
this.sock = sock;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 80fcc46..237e6d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -222,6 +222,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Failure detection timeout feature minor version. */
final static byte FAILURE_DETECTION_MINOR_VER = 4;
+ /** */
+ public static final IgniteProductVersion DISCOVERY_DATA_COLLECT_ON_FINISH = IgniteProductVersion.fromString("1.4.0");
+
/** Failure detection timeout feature maintainance version. */
final static byte FAILURE_DETECTION_MAINT_VER = 1;
@@ -2038,4 +2041,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return S.toString(SocketTimeoutObject.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index c6a469f..fc2a166 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -46,6 +47,9 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
@GridToStringExclude
private Map<String, Object> clientNodeAttrs;
+ /** Discovery data from old nodes. */
+ private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData;
+
/**
* Constructor.
*
@@ -56,6 +60,8 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
super(creatorNodeId);
this.nodeId = nodeId;
+
+ oldNodesDiscoData = new LinkedHashMap<>();
}
/**
@@ -75,6 +81,24 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
}
/**
+ * @return Discovery data from old nodes.
+ */
+ public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {
+ return oldNodesDiscoData;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param discoData Discovery data to add.
+ */
+ public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) {
+ // Old nodes disco data may be null if message
+ // makes more than 1 pass due to stopping of the nodes in topology.
+ if (oldNodesDiscoData != null)
+ oldNodesDiscoData.put(nodeId, discoData);
+ }
+
+ /**
* @param clientDiscoData Discovery data for joined client.
*/
public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
@@ -101,4 +125,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6aa0ee16/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
index 7f0ca11..453b6f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -83,6 +84,8 @@ public class CacheAffEarlySelfTest extends GridCommonAbstractTest {
cfg.setMarshaller(marsh);
+ cfg.setCheckpointSpi(new NoopCheckpointSpi());
+
return cfg;
}