You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/22 03:26:50 UTC
[18/28] ignite git commit: IGNITE-3688: Fixed visiblity issue in
GridCacheIoManager.idxClsHandlers.
IGNITE-3688: Fixed visiblity issue in GridCacheIoManager.idxClsHandlers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/278633ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/278633ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/278633ec
Branch: refs/heads/master
Commit: 278633eced6d8039b5be4a18eefe6c65650aba4f
Parents: 5cf3bea
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Aug 15 14:27:22 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Aug 15 14:27:22 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 24 ++-
.../processors/cache/GridCacheMessage.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 176 ++++++++++++++-----
3 files changed, 155 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 488a22c..78dddd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -97,7 +98,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private int retryCnt;
/** Indexed class handlers. */
- private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+ private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
/** Handler registry. */
private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
@@ -241,7 +242,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
IgniteBiInClosure<UUID, GridCacheMessage> c = null;
if (msgIdx >= 0) {
- IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId());
+ Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+ IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId());
if (cacheClsHandlers != null)
c = cacheClsHandlers[msgIdx];
@@ -262,12 +265,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())).
append(']');
+ msg0.append(U.nl()).append("Registered listeners:");
+
+ Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+ for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
+ msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+
if (cctx.kernalContext().isStopping()) {
if (log.isDebugEnabled())
log.debug(msg0.toString());
}
else
- U.warn(log, msg0.toString());
+ U.error(log, msg0.toString());
return;
}
@@ -1062,12 +1072,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
int msgIdx = messageIndex(type);
if (msgIdx != -1) {
- IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheId);
+ Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+
+ IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId);
if (cacheClsHandlers == null) {
cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
- idxClsHandlers.put(cacheId, cacheClsHandlers);
+ idxClsHandlers0.put(cacheId, cacheClsHandlers);
}
if (cacheClsHandlers[msgIdx] != null)
@@ -1076,6 +1088,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cacheClsHandlers[msgIdx] = c;
+ idxClsHandlers = idxClsHandlers0;
+
return;
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index f99d2cd..c5407b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
- public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 256;
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3616082..1e45fa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -251,61 +251,155 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
preldr.start();
- ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
- @Override public void apply(UUID nodeId, GridNearGetRequest req) {
- processNearGetRequest(nodeId, req);
- }
- });
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridNearGetRequest.class,
+ new CI2<UUID, GridNearGetRequest>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearGetRequest req
+ ) {
+ processNearGetRequest(
+ nodeId,
+ req);
+ }
+ });
- ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
- @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
- processNearSingleGetRequest(nodeId, req);
- }
- });
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridNearSingleGetRequest.class,
+ new CI2<UUID, GridNearSingleGetRequest>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearSingleGetRequest req
+ ) {
+ processNearSingleGetRequest(
+ nodeId,
+ req);
+ }
+ });
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
- }
- });
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridNearAtomicUpdateRequest.class,
+ new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearAtomicUpdateRequest req
+ ) {
+ processNearAtomicUpdateRequest(
+ nodeId,
+ req);
+ }
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
- processNearAtomicUpdateResponse(nodeId, res);
- }
- });
+ @Override public String toString() {
+ return "GridNearAtomicUpdateRequest handler " +
+ "[msgIdx=" + GridNearAtomicUpdateRequest.CACHE_MSG_IDX + ']';
+ }
+ });
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
- processDhtAtomicUpdateRequest(nodeId, req);
- }
- });
+ ctx.io().addHandler(ctx.cacheId(),
+ GridNearAtomicUpdateResponse.class,
+ new CI2<UUID, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearAtomicUpdateResponse res
+ ) {
+ processNearAtomicUpdateResponse(
+ nodeId,
+ res);
+ }
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
- processDhtAtomicUpdateResponse(nodeId, res);
- }
- });
+ @Override public String toString() {
+ return "GridNearAtomicUpdateResponse handler " +
+ "[msgIdx=" + GridNearAtomicUpdateResponse.CACHE_MSG_IDX + ']';
+ }
+ });
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class,
- new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
- processDhtAtomicDeferredUpdateResponse(nodeId, res);
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridDhtAtomicUpdateRequest.class,
+ new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridDhtAtomicUpdateRequest req
+ ) {
+ processDhtAtomicUpdateRequest(
+ nodeId,
+ req);
+ }
+
+ @Override public String toString() {
+ return "GridDhtAtomicUpdateRequest handler " +
+ "[msgIdx=" + GridDhtAtomicUpdateRequest.CACHE_MSG_IDX + ']';
}
});
- if (near == null) {
- ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearGetResponse res) {
- processNearGetResponse(nodeId, res);
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridDhtAtomicUpdateResponse.class,
+ new CI2<UUID, GridDhtAtomicUpdateResponse>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridDhtAtomicUpdateResponse res
+ ) {
+ processDhtAtomicUpdateResponse(
+ nodeId,
+ res);
+ }
+
+ @Override public String toString() {
+ return "GridDhtAtomicUpdateResponse handler " +
+ "[msgIdx=" + GridDhtAtomicUpdateResponse.CACHE_MSG_IDX + ']';
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
- processNearSingleGetResponse(nodeId, res);
+ ctx.io().addHandler(ctx.cacheId(),
+ GridDhtAtomicDeferredUpdateResponse.class,
+ new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridDhtAtomicDeferredUpdateResponse res
+ ) {
+ processDhtAtomicDeferredUpdateResponse(
+ nodeId,
+ res);
+ }
+
+ @Override public String toString() {
+ return "GridDhtAtomicDeferredUpdateResponse handler " +
+ "[msgIdx=" + GridDhtAtomicDeferredUpdateResponse.CACHE_MSG_IDX + ']';
}
});
+
+ if (near == null) {
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridNearGetResponse.class,
+ new CI2<UUID, GridNearGetResponse>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearGetResponse res
+ ) {
+ processNearGetResponse(
+ nodeId,
+ res);
+ }
+ });
+
+ ctx.io().addHandler(
+ ctx.cacheId(),
+ GridNearSingleGetResponse.class,
+ new CI2<UUID, GridNearSingleGetResponse>() {
+ @Override public void apply(
+ UUID nodeId,
+ GridNearSingleGetResponse res
+ ) {
+ processNearSingleGetResponse(
+ nodeId,
+ res);
+ }
+ });
}
}