You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/22 03:26:50 UTC

[18/28] ignite git commit: IGNITE-3688: Fixed visiblity issue in GridCacheIoManager.idxClsHandlers.

IGNITE-3688: Fixed visiblity issue in GridCacheIoManager.idxClsHandlers.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/278633ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/278633ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/278633ec

Branch: refs/heads/master
Commit: 278633eced6d8039b5be4a18eefe6c65650aba4f
Parents: 5cf3bea
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Aug 15 14:27:22 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Aug 15 14:27:22 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  24 ++-
 .../processors/cache/GridCacheMessage.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 176 ++++++++++++++-----
 3 files changed, 155 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 488a22c..78dddd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -97,7 +98,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     private int retryCnt;
 
     /** Indexed class handlers. */
-    private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+    private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
 
     /** Handler registry. */
     private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
@@ -241,7 +242,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         IgniteBiInClosure<UUID, GridCacheMessage> c = null;
 
         if (msgIdx >= 0) {
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId());
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId());
 
             if (cacheClsHandlers != null)
                 c = cacheClsHandlers[msgIdx];
@@ -262,12 +265,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())).
                 append(']');
 
+            msg0.append(U.nl()).append("Registered listeners:");
+
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+            for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
+                msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+
             if (cctx.kernalContext().isStopping()) {
                 if (log.isDebugEnabled())
                     log.debug(msg0.toString());
             }
             else
-                U.warn(log, msg0.toString());
+                U.error(log, msg0.toString());
 
             return;
         }
@@ -1062,12 +1072,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         int msgIdx = messageIndex(type);
 
         if (msgIdx != -1) {
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheId);
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId);
 
             if (cacheClsHandlers == null) {
                 cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
 
-                idxClsHandlers.put(cacheId, cacheClsHandlers);
+                idxClsHandlers0.put(cacheId, cacheClsHandlers);
             }
 
             if (cacheClsHandlers[msgIdx] != null)
@@ -1076,6 +1088,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             cacheClsHandlers[msgIdx] = c;
 
+            idxClsHandlers = idxClsHandlers0;
+
             return;
         }
         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index f99d2cd..c5407b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
     private static final long serialVersionUID = 0L;
 
     /** Maximum number of cache lookup indexes. */
-    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 256;
+    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5;
 
     /** Cache message index field name. */
     public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";

http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3616082..1e45fa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -251,61 +251,155 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         preldr.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
-            @Override public void apply(UUID nodeId, GridNearGetRequest req) {
-                processNearGetRequest(nodeId, req);
-            }
-        });
+        ctx.io().addHandler(
+            ctx.cacheId(),
+            GridNearGetRequest.class,
+            new CI2<UUID, GridNearGetRequest>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridNearGetRequest req
+                ) {
+                    processNearGetRequest(
+                        nodeId,
+                        req);
+                }
+            });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
-            @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
-                processNearSingleGetRequest(nodeId, req);
-            }
-        });
+        ctx.io().addHandler(
+            ctx.cacheId(),
+            GridNearSingleGetRequest.class,
+            new CI2<UUID, GridNearSingleGetRequest>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridNearSingleGetRequest req
+                ) {
+                    processNearSingleGetRequest(
+                        nodeId,
+                        req);
+                }
+            });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                processNearAtomicUpdateRequest(nodeId, req);
-            }
-        });
+        ctx.io().addHandler(
+            ctx.cacheId(),
+            GridNearAtomicUpdateRequest.class,
+            new CI2<UUID, GridNearAtomicUpdateRequest>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridNearAtomicUpdateRequest req
+                ) {
+                    processNearAtomicUpdateRequest(
+                        nodeId,
+                        req);
+                }
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
-                processNearAtomicUpdateResponse(nodeId, res);
-            }
-        });
+                @Override public String toString() {
+                    return "GridNearAtomicUpdateRequest handler " +
+                        "[msgIdx=" + GridNearAtomicUpdateRequest.CACHE_MSG_IDX + ']';
+                }
+            });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
-                processDhtAtomicUpdateRequest(nodeId, req);
-            }
-        });
+        ctx.io().addHandler(ctx.cacheId(),
+            GridNearAtomicUpdateResponse.class,
+            new CI2<UUID, GridNearAtomicUpdateResponse>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridNearAtomicUpdateResponse res
+                ) {
+                    processNearAtomicUpdateResponse(
+                        nodeId,
+                        res);
+                }
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() {
-            @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-                processDhtAtomicUpdateResponse(nodeId, res);
-            }
-        });
+                @Override public String toString() {
+                    return "GridNearAtomicUpdateResponse handler " +
+                        "[msgIdx=" + GridNearAtomicUpdateResponse.CACHE_MSG_IDX + ']';
+                }
+            });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class,
-            new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
-                @Override public void apply(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
-                    processDhtAtomicDeferredUpdateResponse(nodeId, res);
+        ctx.io().addHandler(
+            ctx.cacheId(),
+            GridDhtAtomicUpdateRequest.class,
+            new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridDhtAtomicUpdateRequest req
+                ) {
+                    processDhtAtomicUpdateRequest(
+                        nodeId,
+                        req);
+                }
+
+                @Override public String toString() {
+                    return "GridDhtAtomicUpdateRequest handler " +
+                        "[msgIdx=" + GridDhtAtomicUpdateRequest.CACHE_MSG_IDX + ']';
                 }
             });
 
-        if (near == null) {
-            ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
-                @Override public void apply(UUID nodeId, GridNearGetResponse res) {
-                    processNearGetResponse(nodeId, res);
+        ctx.io().addHandler(
+            ctx.cacheId(),
+            GridDhtAtomicUpdateResponse.class,
+            new CI2<UUID, GridDhtAtomicUpdateResponse>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridDhtAtomicUpdateResponse res
+                ) {
+                    processDhtAtomicUpdateResponse(
+                        nodeId,
+                        res);
+                }
+
+                @Override public String toString() {
+                    return "GridDhtAtomicUpdateResponse handler " +
+                        "[msgIdx=" + GridDhtAtomicUpdateResponse.CACHE_MSG_IDX + ']';
                 }
             });
 
-            ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
-                @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
-                    processNearSingleGetResponse(nodeId, res);
+        ctx.io().addHandler(ctx.cacheId(),
+            GridDhtAtomicDeferredUpdateResponse.class,
+            new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
+                @Override public void apply(
+                    UUID nodeId,
+                    GridDhtAtomicDeferredUpdateResponse res
+                ) {
+                    processDhtAtomicDeferredUpdateResponse(
+                        nodeId,
+                        res);
+                }
+
+                @Override public String toString() {
+                    return "GridDhtAtomicDeferredUpdateResponse handler " +
+                        "[msgIdx=" + GridDhtAtomicDeferredUpdateResponse.CACHE_MSG_IDX + ']';
                 }
             });
+
+        if (near == null) {
+            ctx.io().addHandler(
+                ctx.cacheId(),
+                GridNearGetResponse.class,
+                new CI2<UUID, GridNearGetResponse>() {
+                    @Override public void apply(
+                        UUID nodeId,
+                        GridNearGetResponse res
+                    ) {
+                        processNearGetResponse(
+                            nodeId,
+                            res);
+                    }
+                });
+
+            ctx.io().addHandler(
+                ctx.cacheId(),
+                GridNearSingleGetResponse.class,
+                new CI2<UUID, GridNearSingleGetResponse>() {
+                    @Override public void apply(
+                        UUID nodeId,
+                        GridNearSingleGetResponse res
+                    ) {
+                        processNearSingleGetResponse(
+                            nodeId,
+                            res);
+                    }
+                });
         }
     }