You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 12:45:59 UTC
ignite git commit: Pass io policy in GridMessageListener.
Repository: ignite
Updated Branches:
refs/heads/master 7db925c1c -> bdd31af76
Pass io policy in GridMessageListener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdd31af7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdd31af7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdd31af7
Branch: refs/heads/master
Commit: bdd31af760f745c9ab05f75d100649a30dfe4f1e
Parents: 7db925c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 15:45:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 15:45:04 2017 +0300
----------------------------------------------------------------------
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 10 +--
.../communication/GridMessageListener.java | 3 +-
.../deployment/GridDeploymentCommunication.java | 4 +-
.../eventstorage/GridEventStorageManager.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 77 +++++++++++---------
.../cache/binary/BinaryMetadataTransport.java | 4 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
.../processors/cluster/ClusterProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/job/GridJobProcessor.java | 8 +-
.../GridMarshallerMappingProcessor.java | 4 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 6 +-
.../jobstealing/JobStealingCollisionSpi.java | 2 +-
...idCommunicationManagerListenersSelfTest.java | 2 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../cache/GridCachePartitionedGetSelfTest.java | 2 +-
...lerCacheClientRequestsMappingOnMissTest.java | 6 +-
...naryObjectMetadataExchangeMultinodeTest.java | 6 +-
...DeadlockDetectionMessageMarshallingTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 12 +--
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../testframework/GridSpiTestContext.java | 5 +-
.../hadoop/shuffle/HadoopShuffle.java | 2 +-
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
34 files changed, 106 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 782ee5e..aae1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -464,7 +464,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @param msg Received message.
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridCheckpointRequest req = (GridCheckpointRequest)msg;
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a1ddaf4..3ff44a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -337,7 +337,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
log.debug(startInfo());
addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
@@ -1553,7 +1553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
CUR_PLC.set(plc);
try {
- lsnr.onMessage(nodeId, msg);
+ lsnr.onMessage(nodeId, msg, plc);
}
finally {
if (change)
@@ -2323,14 +2323,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param nodeId Node ID.
* @param msg Message.
*/
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridMessageListener[] arr0 = arr;
if (arr0 == null)
return;
for (GridMessageListener l : arr0)
- l.onMessage(nodeId, msg);
+ l.onMessage(nodeId, msg, plc);
}
/**
@@ -2430,7 +2430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** {@inheritDoc} */
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
"OverlyStrongTypeCast"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridIoUserMessage)) {
U.error(log, "Received unknown message (potentially fatal problem): " + msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
index 3993591..c7de57c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
@@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener {
* @param nodeId ID of node that sent the message. Note that may have already
* left topology by the time this message is received.
* @param msg Message received.
+ * @param plc Message policy (pool).
*/
- public void onMessage(UUID nodeId, Object msg);
+ public void onMessage(UUID nodeId, Object msg, byte plc);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 23d186a..2a5f7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -83,7 +83,7 @@ class GridDeploymentCommunication {
this.log = log.getLogger(getClass());
peerLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
processDeploymentRequest(nodeId, msg);
}
};
@@ -422,7 +422,7 @@ class GridDeploymentCommunication {
};
GridMessageListener resLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index bd43e43..7836004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1008,7 +1008,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
GridMessageListener resLsnr = new GridMessageListener() {
@SuppressWarnings("deprecation")
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1185,7 +1185,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
*/
private class RequestListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a920bd0..49cfcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -155,7 +155,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** Message listener. */
private GridMessageListener lsnr = new GridMessageListener() {
- @Override public void onMessage(final UUID nodeId, final Object msg) {
+ @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
if (log.isDebugEnabled())
log.debug("Received unordered cache communication message [nodeId=" + nodeId +
", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
@@ -196,7 +196,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@Override public void apply(IgniteInternalFuture<?> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
});
}
@@ -269,40 +269,48 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug(msg0.toString());
}
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
};
if (stripe >= 0)
cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
- else
- cctx.kernalContext().closure().runLocalSafe(c);
+ else {
+ try {
+ cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
+ }
+ }
}
});
return;
}
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
};
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
+ * @param plc Message policy.
*/
@SuppressWarnings("unchecked")
- private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
- handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers);
+ private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
+ handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc);
}
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
* @param msgHandlers Message handlers.
+ * @param plc Message policy.
*/
@SuppressWarnings("unchecked")
- private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
+ private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
Lock lock = rw.readLock();
lock.lock();
@@ -356,7 +364,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return;
}
- onMessage0(nodeId, cacheMsg, c);
+ onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
@@ -517,10 +525,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param nodeId Node ID.
* @param cacheMsg Cache message.
* @param c Handler closure.
+ * @param plc Message policy.
*/
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
- final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+ final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
try {
if (stopping) {
if (log.isDebugEnabled())
@@ -536,7 +545,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
- processFailedMessage(nodeId, cacheMsg, c);
+ processFailedMessage(nodeId, cacheMsg, c, plc);
else
processMessage(nodeId, cacheMsg, c);
}
@@ -669,15 +678,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param nodeId Node ID.
* @param msg Message.
+ * @param c Closure.
+ * @param plc Message policy.
* @throws IgniteCheckedException If failed.
*/
- private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
+ private void processFailedMessage(UUID nodeId,
+ GridCacheMessage msg,
+ IgniteBiInClosure<UUID, GridCacheMessage> c,
+ byte plc)
throws IgniteCheckedException {
assert msg != null;
- GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
- cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
-
switch (msg.directType()) {
case 30: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
@@ -688,9 +699,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.futureId(),
req.miniId(),
0,
- ctx.deploymentEnabled());
+ false);
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -723,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.onError(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -734,7 +745,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
nearRes.errors(new UpdateErrors(req.classError()));
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
@@ -753,7 +764,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -770,7 +781,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -793,7 +804,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -831,7 +842,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
null,
false);
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -872,7 +883,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cctx.node(nodeId),
TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
res,
- ctx.ioPolicy(),
+ plc,
Long.MAX_VALUE);
}
@@ -897,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -935,7 +946,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -953,7 +964,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -971,7 +982,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -987,7 +998,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.onError(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -998,7 +1009,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
nearRes.errors(new UpdateErrors(req.classError()));
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
@@ -1540,7 +1551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass", "unchecked"})
- @Override public void onMessage(final UUID nodeId, Object msg) {
+ @Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
if (log.isDebugEnabled())
log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
@@ -1551,7 +1562,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
try {
GridCacheMessage cacheMsg = (GridCacheMessage)msg;
- onMessage0(nodeId, cacheMsg, c);
+ onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 00c760f..5fd7295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -561,7 +561,7 @@ final class BinaryMetadataTransport {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataRequestMessage : msg;
MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
@@ -606,7 +606,7 @@ final class BinaryMetadataTransport {
private final class MetadataResponseListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataResponseMessage : msg;
MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a9aa13d..3a3b766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2433,7 +2433,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private class DeadlockDetectionListener implements GridMessageListener {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridCacheMessage cacheMsg = (GridCacheMessage)msg;
Throwable err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index ed83650..0bd2370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -146,7 +146,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgniteDiagnosticMessage) {
IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f641399..b1d3442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -289,7 +289,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
});
ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
+ @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
GridContinuousMessage msg = (GridContinuousMessage)obj;
if (msg.data() == null && msg.dataBytes() != null) {
@@ -771,7 +771,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private void registerMessageListener(GridContinuousHandler hnd) {
if (hnd.orderedTopic() != null) {
ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
+ @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
GridContinuousMessage msg = (GridContinuousMessage)obj;
// Only notification can be ordered.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c52f7ac..31ae1e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -82,7 +82,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (!ctx.clientNode()) {
ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof DataStreamerRequest;
processRequest(nodeId, (DataStreamerRequest)msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 40988d3..ae441de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -319,7 +319,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
ctx.io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof DataStreamerResponse;
DataStreamerResponse res = (DataStreamerResponse)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 8ae6db8..a4ea337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -158,7 +158,7 @@ public class IgfsDataManager extends IgfsManager {
topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsBlocksMessage)
processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
else if (msg instanceof IgfsAckMessage)
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 7797f89..0c75bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsFragmentizerResponse) {
IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
@@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsFragmentizerRequest ||
msg instanceof IgfsSyncMessage) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 408396a..5ae4da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -447,7 +447,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
final Condition cond = lock.newCondition();
GridMessageListener msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
String err = null;
GridJobSiblingsResponse res = null;
@@ -1856,7 +1856,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobSessionListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1872,7 +1872,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobCancelListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1890,7 +1890,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobExecutionListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 8de6c49..2543042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -168,7 +168,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingRequestMessage : msg;
MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
@@ -200,7 +200,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
*/
private final class MissingMappingResponseListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingResponseMessage : msg;
MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index dd07584..d55a129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -207,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
valCtx = new CacheQueryObjectValueContext(ctx);
ioLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof SchemaOperationStatusMessage) {
SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 99ba335..d9b49cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
super(ctx);
ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridTaskResultRequest)) {
U.warn(log, "Received unexpected message instead of task result request: " + msg);
@@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
final Condition cond = lock.newCondition();
GridMessageListener msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
String err = null;
GridTaskResultResponse res = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6ae97dd..d7a022e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1282,7 +1282,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof GridJobExecuteResponse)
processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
else if (jobResOnly)
@@ -1326,7 +1326,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
*/
private class JobSiblingsMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridJobSiblingsRequest)) {
U.warn(log, "Received unexpected message instead of siblings request: " + msg);
@@ -1398,7 +1398,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
*/
private class TaskCancelMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg != null;
if (!(msg instanceof GridTaskCancelRequest)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 6f2c099..39a8e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -648,7 +648,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
spiCtx.addMessageListener(
msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
MessageInfo info = rcvMsgMap.get(nodeId);
if (info == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
index 1738da8..03b7921 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
@@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 29b7847..3563c77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -143,7 +143,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
mgr1.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msgCls.isInstance(msg))
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index 308f2b4..d5988f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -224,7 +224,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
((IgniteKernal)g).context().io().addMessageListener(
TOPIC_CACHE,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
if (msg instanceof GridNearSingleGetRequest) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
index f5f2512..057b970 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
@@ -300,10 +300,10 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()];
GridMessageListener wrapper = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
mappingReqsCounter.incrementAndGet();
- delegate.onMessage(nodeId, msg);
+ delegate.onMessage(nodeId, msg, plc);
}
};
@@ -321,7 +321,7 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH);
ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
new Thread(new Runnable() {
@Override public void run() {
mappingReqsCounter.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
index 9370e27..6ff703e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -396,7 +396,7 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
new Thread(new Runnable() {
@Override public void run() {
metadataReqsCounter.incrementAndGet();
@@ -416,9 +416,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
GridMessageListener wrapper = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
metadataReqsCounter.incrementAndGet();
- delegate.onMessage(nodeId, msg);
+ delegate.onMessage(nodeId, msg, plc);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
index 9126053..1a48cec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
@@ -76,7 +76,7 @@ public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstrac
final AtomicBoolean res = new AtomicBoolean();
clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof TxLocksResponse) {
try {
((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader());
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 03bbb00..5671158 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -240,7 +240,7 @@ public class GridIoManagerBenchmark {
GridMessageListener lsnr = new GridMessageListener() {
private ClusterNode node;
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (node == null)
node = g.context().discovery().node(nodeId);
@@ -336,7 +336,7 @@ public class GridIoManagerBenchmark {
*/
private static class SenderMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
if (testLatency)
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 0f0332f..7b1d972 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
sem.release();
@@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
map.get(((GridTestMessage)msg).id()).countDown();
}
});
@@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
sem.release();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 4a6b765..435af8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
mgr1.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
latch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7d3c5d6..93cd911 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -328,7 +329,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
@SuppressWarnings("deprecation")
public void triggerMessage(ClusterNode node, Object msg) {
for (GridMessageListener lsnr : msgLsnrs)
- lsnr.onMessage(node.id(), msg);
+ lsnr.onMessage(node.id(), msg, GridIoPolicy.SYSTEM_POOL);
}
/** {@inheritDoc} */
@@ -667,7 +668,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
@SuppressWarnings({
"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
"OverlyStrongTypeCast"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
Object msgBody = ioMsg.body();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 3296993..cd1c93c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -63,7 +63,7 @@ public class HadoopShuffle extends HadoopComponent {
super.start(ctx);
ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
onMessageReceived(nodeId, (HadoopMessage)msg);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 3dabc58..542adf0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -135,7 +135,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridSpinBusyLock l = desc.indexing().busyLock();
if (!l.enterBusy())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6b7ba75..fcf5f10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -172,7 +172,7 @@ public class GridMapQueryExecutor {
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b85fa61..85a7e0b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -169,7 +169,7 @@ public class GridReduceQueryExecutor {
log = ctx.log(GridReduceQueryExecutor.class);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;