You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 11:46:09 UTC
[39/50] incubator-ignite git commit: ignite-999 ignore duplicated
discovery custom messages
ignite-999 ignore duplicated discovery custom messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/80c6cf0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/80c6cf0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/80c6cf0b
Branch: refs/heads/ignite-sprint-6
Commit: 80c6cf0b3d763e3fcfab37653de3f2c6dddf30e8
Parents: 14bb076
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 9 11:14:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 9 11:14:09 2015 +0300
----------------------------------------------------------------------
.../discovery/DiscoveryCustomMessage.java | 6 ++++
.../discovery/GridDiscoveryManager.java | 32 ++++++++++++++++++++
.../cache/DynamicCacheChangeBatch.java | 19 +++++++++---
.../continuous/AbstractContinuousMessage.java | 9 ++++++
.../DataStreamerMultinodeCreateCacheTest.java | 4 +--
5 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 693bbef..401486d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -27,6 +28,11 @@ import java.io.*;
*/
public interface DiscoveryCustomMessage extends Serializable {
/**
+ * @return Unique custom message ID.
+ */
+ public IgniteUuid id();
+
+ /**
* Whether or not minor version of topology should be increased on message receive.
*
* @return {@code true} if minor topology version should be increased.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 142dbaa..71fbc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -178,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** Received custom messages history. */
+ private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
+
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
@@ -359,6 +362,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
: ((CustomMessageWrapper)spiCustomMsg).delegate();
+ if (skipMessage(type, customMsg))
+ return;
+
final ClusterNode locNode = localNode();
if (snapshots != null)
@@ -515,6 +521,32 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param type Message type.
+ * @param customMsg Custom message.
+ * @return {@code True} if should not process message.
+ */
+ private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
+ if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ assert customMsg != null && customMsg.id() != null : customMsg;
+
+ if (rcvdCustomMsgs.contains(customMsg.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]");
+
+ return true;
+ }
+
+ rcvdCustomMsgs.addLast(customMsg.id());
+
+ while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE)
+ rcvdCustomMsgs.pollFirst();
+ }
+
+ return false;
+ }
+
+ /**
+ * @param msgCls Message class.
* @param lsnr Custom event listener.
*/
public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 5fcd0e2..dfc39c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -39,6 +40,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
@GridToStringInclude
private Map<String, Map<UUID, Boolean>> clientNodes;
+ /** Custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
/**
* @param reqs Requests.
*/
@@ -48,6 +52,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
this.reqs = reqs;
}
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
/**
* @return Collection of change requests.
*/
@@ -70,11 +79,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DynamicCacheChangeBatch.class, this);
- }
-
- /** {@inheritDoc} */
@Override public boolean incrementMinorTopologyVersion() {
return true;
}
@@ -88,4 +92,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
@Override public boolean isMutable() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DynamicCacheChangeBatch.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index f375777..91768a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.continuous;
import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.lang.*;
import java.util.*;
@@ -28,6 +29,9 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
/** Routine ID. */
protected final UUID routineId;
+ /** Custom message ID. */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
/**
* @param id Id.
*/
@@ -35,6 +39,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
routineId = id;
}
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
/**
* @return Routine ID.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
index 2d19d6f..d258a33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
@@ -42,8 +42,8 @@ public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(5000);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(5000);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(50);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(50);
return cfg;
}