You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 10:14:18 UTC

incubator-ignite git commit: ignite-999 ignore duplicated discovery custom messages

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-999 [created] 80c6cf0b3


ignite-999 ignore duplicated discovery custom messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/80c6cf0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/80c6cf0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/80c6cf0b

Branch: refs/heads/ignite-999
Commit: 80c6cf0b3d763e3fcfab37653de3f2c6dddf30e8
Parents: 14bb076
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 9 11:14:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 9 11:14:09 2015 +0300

----------------------------------------------------------------------
 .../discovery/DiscoveryCustomMessage.java       |  6 ++++
 .../discovery/GridDiscoveryManager.java         | 32 ++++++++++++++++++++
 .../cache/DynamicCacheChangeBatch.java          | 19 +++++++++---
 .../continuous/AbstractContinuousMessage.java   |  9 ++++++
 .../DataStreamerMultinodeCreateCacheTest.java   |  4 +--
 5 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 693bbef..401486d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -27,6 +28,11 @@ import java.io.*;
  */
 public interface DiscoveryCustomMessage extends Serializable {
     /**
+     * @return Unique custom message ID.
+     */
+    public IgniteUuid id();
+
+    /**
      * Whether or not minor version of topology should be increased on message receive.
      *
      * @return {@code true} if minor topology version should be increased.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 142dbaa..71fbc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -178,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** Received custom messages history. */
+    private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -359,6 +362,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
                     : ((CustomMessageWrapper)spiCustomMsg).delegate();
 
+                if (skipMessage(type, customMsg))
+                    return;
+
                 final ClusterNode locNode = localNode();
 
                 if (snapshots != null)
@@ -515,6 +521,32 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param type Message type.
+     * @param customMsg Custom message.
+     * @return {@code True} if should not process message.
+     */
+    private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
+        if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+            assert customMsg != null && customMsg.id() != null : customMsg;
+
+            if (rcvdCustomMsgs.contains(customMsg.id())) {
+                if (log.isDebugEnabled())
+                    log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]");
+
+                return true;
+            }
+
+            rcvdCustomMsgs.addLast(customMsg.id());
+
+            while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE)
+                rcvdCustomMsgs.pollFirst();
+        }
+
+        return false;
+    }
+
+    /**
+     * @param msgCls Message class.
      * @param lsnr Custom event listener.
      */
     public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 5fcd0e2..dfc39c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -39,6 +40,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     @GridToStringInclude
     private Map<String, Map<UUID, Boolean>> clientNodes;
 
+    /** Custom message ID. */
+    private IgniteUuid id = IgniteUuid.randomUuid();
+
     /**
      * @param reqs Requests.
      */
@@ -48,6 +52,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         this.reqs = reqs;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
     /**
      * @return Collection of change requests.
      */
@@ -70,11 +79,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(DynamicCacheChangeBatch.class, this);
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean incrementMinorTopologyVersion() {
         return true;
     }
@@ -88,4 +92,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     @Override public boolean isMutable() {
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DynamicCacheChangeBatch.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index f375777..91768a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.continuous;
 
 import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.lang.*;
 
 import java.util.*;
 
@@ -28,6 +29,9 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
     /** Routine ID. */
     protected final UUID routineId;
 
+    /** Custom message ID. */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
     /**
      * @param id Id.
      */
@@ -35,6 +39,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
         routineId = id;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
     /**
      * @return Routine ID.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
index 2d19d6f..d258a33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
@@ -42,8 +42,8 @@ public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(5000);
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(5000);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(50);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(50);
 
         return cfg;
     }