You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/04 14:14:05 UTC
[1/8] ignite git commit: Pass io policy in GridMessageListener.
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-12389 578864957 -> 87cb2ae60
Pass io policy in GridMessageListener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdd31af7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdd31af7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdd31af7
Branch: refs/heads/ignite-gg-12389
Commit: bdd31af760f745c9ab05f75d100649a30dfe4f1e
Parents: 7db925c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 15:45:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 15:45:04 2017 +0300
----------------------------------------------------------------------
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 10 +--
.../communication/GridMessageListener.java | 3 +-
.../deployment/GridDeploymentCommunication.java | 4 +-
.../eventstorage/GridEventStorageManager.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 77 +++++++++++---------
.../cache/binary/BinaryMetadataTransport.java | 4 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
.../processors/cluster/ClusterProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/job/GridJobProcessor.java | 8 +-
.../GridMarshallerMappingProcessor.java | 4 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 6 +-
.../jobstealing/JobStealingCollisionSpi.java | 2 +-
...idCommunicationManagerListenersSelfTest.java | 2 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../cache/GridCachePartitionedGetSelfTest.java | 2 +-
...lerCacheClientRequestsMappingOnMissTest.java | 6 +-
...naryObjectMetadataExchangeMultinodeTest.java | 6 +-
...DeadlockDetectionMessageMarshallingTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 12 +--
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../testframework/GridSpiTestContext.java | 5 +-
.../hadoop/shuffle/HadoopShuffle.java | 2 +-
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
34 files changed, 106 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 782ee5e..aae1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -464,7 +464,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
* @param msg Received message.
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridCheckpointRequest req = (GridCheckpointRequest)msg;
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a1ddaf4..3ff44a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -337,7 +337,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
log.debug(startInfo());
addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
@@ -1553,7 +1553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
CUR_PLC.set(plc);
try {
- lsnr.onMessage(nodeId, msg);
+ lsnr.onMessage(nodeId, msg, plc);
}
finally {
if (change)
@@ -2323,14 +2323,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param nodeId Node ID.
* @param msg Message.
*/
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridMessageListener[] arr0 = arr;
if (arr0 == null)
return;
for (GridMessageListener l : arr0)
- l.onMessage(nodeId, msg);
+ l.onMessage(nodeId, msg, plc);
}
/**
@@ -2430,7 +2430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** {@inheritDoc} */
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
"OverlyStrongTypeCast"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridIoUserMessage)) {
U.error(log, "Received unknown message (potentially fatal problem): " + msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
index 3993591..c7de57c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
@@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener {
* @param nodeId ID of node that sent the message. Note that may have already
* left topology by the time this message is received.
* @param msg Message received.
+ * @param plc Message policy (pool).
*/
- public void onMessage(UUID nodeId, Object msg);
+ public void onMessage(UUID nodeId, Object msg, byte plc);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 23d186a..2a5f7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -83,7 +83,7 @@ class GridDeploymentCommunication {
this.log = log.getLogger(getClass());
peerLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
processDeploymentRequest(nodeId, msg);
}
};
@@ -422,7 +422,7 @@ class GridDeploymentCommunication {
};
GridMessageListener resLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index bd43e43..7836004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1008,7 +1008,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
GridMessageListener resLsnr = new GridMessageListener() {
@SuppressWarnings("deprecation")
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1185,7 +1185,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
*/
private class RequestListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a920bd0..49cfcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -155,7 +155,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** Message listener. */
private GridMessageListener lsnr = new GridMessageListener() {
- @Override public void onMessage(final UUID nodeId, final Object msg) {
+ @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
if (log.isDebugEnabled())
log.debug("Received unordered cache communication message [nodeId=" + nodeId +
", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
@@ -196,7 +196,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@Override public void apply(IgniteInternalFuture<?> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
});
}
@@ -269,40 +269,48 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug(msg0.toString());
}
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
};
if (stripe >= 0)
cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
- else
- cctx.kernalContext().closure().runLocalSafe(c);
+ else {
+ try {
+ cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
+ }
+ }
}
});
return;
}
- handleMessage(nodeId, cacheMsg);
+ handleMessage(nodeId, cacheMsg, plc);
}
};
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
+ * @param plc Message policy.
*/
@SuppressWarnings("unchecked")
- private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
- handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers);
+ private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
+ handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc);
}
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
* @param msgHandlers Message handlers.
+ * @param plc Message policy.
*/
@SuppressWarnings("unchecked")
- private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
+ private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
Lock lock = rw.readLock();
lock.lock();
@@ -356,7 +364,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return;
}
- onMessage0(nodeId, cacheMsg, c);
+ onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
@@ -517,10 +525,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param nodeId Node ID.
* @param cacheMsg Cache message.
* @param c Handler closure.
+ * @param plc Message policy.
*/
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
- final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+ final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
try {
if (stopping) {
if (log.isDebugEnabled())
@@ -536,7 +545,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
- processFailedMessage(nodeId, cacheMsg, c);
+ processFailedMessage(nodeId, cacheMsg, c, plc);
else
processMessage(nodeId, cacheMsg, c);
}
@@ -669,15 +678,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param nodeId Node ID.
* @param msg Message.
+ * @param c Closure.
+ * @param plc Message policy.
* @throws IgniteCheckedException If failed.
*/
- private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
+ private void processFailedMessage(UUID nodeId,
+ GridCacheMessage msg,
+ IgniteBiInClosure<UUID, GridCacheMessage> c,
+ byte plc)
throws IgniteCheckedException {
assert msg != null;
- GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
- cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
-
switch (msg.directType()) {
case 30: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
@@ -688,9 +699,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.futureId(),
req.miniId(),
0,
- ctx.deploymentEnabled());
+ false);
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -723,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.onError(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -734,7 +745,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
nearRes.errors(new UpdateErrors(req.classError()));
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
@@ -753,7 +764,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -770,7 +781,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -793,7 +804,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -831,7 +842,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
null,
false);
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -872,7 +883,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cctx.node(nodeId),
TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
res,
- ctx.ioPolicy(),
+ plc,
Long.MAX_VALUE);
}
@@ -897,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -935,7 +946,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -953,7 +964,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -971,7 +982,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.error(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
@@ -987,7 +998,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
res.onError(req.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -998,7 +1009,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
nearRes.errors(new UpdateErrors(req.classError()));
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
@@ -1540,7 +1551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass", "unchecked"})
- @Override public void onMessage(final UUID nodeId, Object msg) {
+ @Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
if (log.isDebugEnabled())
log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
@@ -1551,7 +1562,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
try {
GridCacheMessage cacheMsg = (GridCacheMessage)msg;
- onMessage0(nodeId, cacheMsg, c);
+ onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 00c760f..5fd7295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -561,7 +561,7 @@ final class BinaryMetadataTransport {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataRequestMessage : msg;
MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
@@ -606,7 +606,7 @@ final class BinaryMetadataTransport {
private final class MetadataResponseListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MetadataResponseMessage : msg;
MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a9aa13d..3a3b766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2433,7 +2433,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private class DeadlockDetectionListener implements GridMessageListener {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridCacheMessage cacheMsg = (GridCacheMessage)msg;
Throwable err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index ed83650..0bd2370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -146,7 +146,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgniteDiagnosticMessage) {
IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f641399..b1d3442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -289,7 +289,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
});
ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
+ @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
GridContinuousMessage msg = (GridContinuousMessage)obj;
if (msg.data() == null && msg.dataBytes() != null) {
@@ -771,7 +771,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private void registerMessageListener(GridContinuousHandler hnd) {
if (hnd.orderedTopic() != null) {
ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
+ @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
GridContinuousMessage msg = (GridContinuousMessage)obj;
// Only notification can be ordered.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c52f7ac..31ae1e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -82,7 +82,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (!ctx.clientNode()) {
ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof DataStreamerRequest;
processRequest(nodeId, (DataStreamerRequest)msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 40988d3..ae441de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -319,7 +319,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
ctx.io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof DataStreamerResponse;
DataStreamerResponse res = (DataStreamerResponse)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 8ae6db8..a4ea337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -158,7 +158,7 @@ public class IgfsDataManager extends IgfsManager {
topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsBlocksMessage)
processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
else if (msg instanceof IgfsAckMessage)
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 7797f89..0c75bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsFragmentizerResponse) {
IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
@@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof IgfsFragmentizerRequest ||
msg instanceof IgfsSyncMessage) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 408396a..5ae4da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -447,7 +447,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
final Condition cond = lock.newCondition();
GridMessageListener msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
String err = null;
GridJobSiblingsResponse res = null;
@@ -1856,7 +1856,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobSessionListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1872,7 +1872,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobCancelListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
@@ -1890,7 +1890,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private class JobExecutionListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert nodeId != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 8de6c49..2543042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -168,7 +168,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingRequestMessage : msg;
MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
@@ -200,7 +200,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
*/
private final class MissingMappingResponseListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg instanceof MissingMappingResponseMessage : msg;
MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index dd07584..d55a129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -207,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
valCtx = new CacheQueryObjectValueContext(ctx);
ioLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof SchemaOperationStatusMessage) {
SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 99ba335..d9b49cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
super(ctx);
ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridTaskResultRequest)) {
U.warn(log, "Received unexpected message instead of task result request: " + msg);
@@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
final Condition cond = lock.newCondition();
GridMessageListener msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
String err = null;
GridTaskResultResponse res = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6ae97dd..d7a022e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1282,7 +1282,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof GridJobExecuteResponse)
processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
else if (jobResOnly)
@@ -1326,7 +1326,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
*/
private class JobSiblingsMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridJobSiblingsRequest)) {
U.warn(log, "Received unexpected message instead of siblings request: " + msg);
@@ -1398,7 +1398,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
*/
private class TaskCancelMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg != null;
if (!(msg instanceof GridTaskCancelRequest)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 6f2c099..39a8e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -648,7 +648,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
spiCtx.addMessageListener(
msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
MessageInfo info = rcvMsgMap.get(nodeId);
if (info == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
index 1738da8..03b7921 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
@@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac
}
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 29b7847..3563c77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -143,7 +143,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
mgr1.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msgCls.isInstance(msg))
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index 308f2b4..d5988f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -224,7 +224,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
((IgniteKernal)g).context().io().addMessageListener(
TOPIC_CACHE,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
if (msg instanceof GridNearSingleGetRequest) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
index f5f2512..057b970 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
@@ -300,10 +300,10 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()];
GridMessageListener wrapper = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
mappingReqsCounter.incrementAndGet();
- delegate.onMessage(nodeId, msg);
+ delegate.onMessage(nodeId, msg, plc);
}
};
@@ -321,7 +321,7 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH);
ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
new Thread(new Runnable() {
@Override public void run() {
mappingReqsCounter.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
index 9370e27..6ff703e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -396,7 +396,7 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
new Thread(new Runnable() {
@Override public void run() {
metadataReqsCounter.incrementAndGet();
@@ -416,9 +416,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
GridMessageListener wrapper = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
metadataReqsCounter.incrementAndGet();
- delegate.onMessage(nodeId, msg);
+ delegate.onMessage(nodeId, msg, plc);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
index 9126053..1a48cec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
@@ -76,7 +76,7 @@ public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstrac
final AtomicBoolean res = new AtomicBoolean();
clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof TxLocksResponse) {
try {
((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader());
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 03bbb00..5671158 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -240,7 +240,7 @@ public class GridIoManagerBenchmark {
GridMessageListener lsnr = new GridMessageListener() {
private ClusterNode node;
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (node == null)
node = g.context().discovery().node(nodeId);
@@ -336,7 +336,7 @@ public class GridIoManagerBenchmark {
*/
private static class SenderMessageListener implements GridMessageListener {
/** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
if (testLatency)
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 0f0332f..7b1d972 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
sem.release();
@@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
map.get(((GridTestMessage)msg).id()).countDown();
}
});
@@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
rcv.addMessageListener(
topic,
new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
@@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
});
snd.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
sem.release();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 4a6b765..435af8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
mgr1.addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
try {
latch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7d3c5d6..93cd911 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -328,7 +329,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
@SuppressWarnings("deprecation")
public void triggerMessage(ClusterNode node, Object msg) {
for (GridMessageListener lsnr : msgLsnrs)
- lsnr.onMessage(node.id(), msg);
+ lsnr.onMessage(node.id(), msg, GridIoPolicy.SYSTEM_POOL);
}
/** {@inheritDoc} */
@@ -667,7 +668,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
@SuppressWarnings({
"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
"OverlyStrongTypeCast"})
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
Object msgBody = ioMsg.body();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 3296993..cd1c93c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -63,7 +63,7 @@ public class HadoopShuffle extends HadoopComponent {
super.start(ctx);
ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
onMessageReceived(nodeId, (HadoopMessage)msg);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 3dabc58..542adf0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -135,7 +135,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridSpinBusyLock l = desc.indexing().busyLock();
if (!l.enterBusy())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6b7ba75..fcf5f10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -172,7 +172,7 @@ public class GridMapQueryExecutor {
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b85fa61..85a7e0b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -169,7 +169,7 @@ public class GridReduceQueryExecutor {
log = ctx.log(GridReduceQueryExecutor.class);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;
[7/8] ignite git commit: Merge master into ignite-2.1.2
Posted by sb...@apache.org.
Merge master into ignite-2.1.2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15613e2a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15613e2a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15613e2a
Branch: refs/heads/ignite-gg-12389
Commit: 15613e2af5e0a4a0014bb5c6d6f6915038b1be1a
Parents: d846197 5093660
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Jul 4 12:39:38 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jul 4 12:39:38 2017 +0300
----------------------------------------------------------------------
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 10 +--
.../communication/GridMessageListener.java | 3 +-
.../deployment/GridDeploymentCommunication.java | 4 +-
.../eventstorage/GridEventStorageManager.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 85 +++++++++++---------
.../GridCachePartitionExchangeManager.java | 16 ++--
.../cache/binary/BinaryMetadataTransport.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 11 +--
.../cache/transactions/IgniteTxManager.java | 2 +-
.../processors/cluster/ClusterProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/job/GridJobProcessor.java | 8 +-
.../GridMarshallerMappingProcessor.java | 4 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 6 +-
.../jobstealing/JobStealingCollisionSpi.java | 2 +-
...idCommunicationManagerListenersSelfTest.java | 2 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../cache/GridCachePartitionedGetSelfTest.java | 2 +-
...lerCacheClientRequestsMappingOnMissTest.java | 6 +-
...naryObjectMetadataExchangeMultinodeTest.java | 6 +-
...DeadlockDetectionMessageMarshallingTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 12 +--
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../testframework/GridSpiTestContext.java | 5 +-
.../hadoop/shuffle/HadoopShuffle.java | 2 +-
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../Apache.Ignite.Core/Impl/IgniteUtils.cs | 12 ++-
37 files changed, 129 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 44eedb1,49cfcdd..2de3808
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@@ -19,7 -19,7 +19,6 @@@ package org.apache.ignite.internal.proc
import java.util.ArrayList;
import java.util.Arrays;
--import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@@ -80,17 -80,17 +79,12 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
--import org.apache.ignite.internal.util.F0;
--import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.typedef.CI1;
--import org.apache.ignite.internal.util.typedef.F;
--import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
--import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@@ -355,19 -358,9 +357,19 @@@ public class GridCacheIoManager extend
if (log.isDebugEnabled())
log.debug(msg0.toString());
}
- else
+ else {
U.error(log, msg0.toString());
+ try {
+ cacheMsg.onClassError(new IgniteCheckedException("Failed to find message handler for message: " + cacheMsg));
+
- processFailedMessage(nodeId, cacheMsg, c);
++ processFailedMessage(nodeId, cacheMsg, c, plc);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to process failed message: " + e, e);
+ }
+ }
+
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f07119a,2d1aca0..93310e3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -863,7 -860,7 +863,7 @@@ public class GridCachePartitionExchange
* @param nodes Nodes.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true);
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null);
++ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null, null, null);
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@@ -886,26 -883,18 +886,24 @@@
}
/**
- * @param nodes Target nodes.
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
- * @param compress {@code True} if it is possible to use compression for message.
* @return Message.
*/
- public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
- final @Nullable GridDhtPartitionExchangeId exchId,
- @Nullable GridCacheVersion lastVer) {
+ @Nullable final GridDhtPartitionExchangeId exchId,
+ @Nullable GridCacheVersion lastVer,
+ @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
- @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
- final boolean compress) {
++ @Nullable IgniteDhtPartitionsToReloadMap partsToReload
++ ) {
final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
lastVer,
- exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+ exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
+ partHistSuppliers,
+ partsToReload
+ );
- m.compress(compress);
+ m.compress(true);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 42f60b1,7471855..a1926ee
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1156,12 -1089,8 +1155,10 @@@ public class GridDhtPartitionsExchangeF
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
- nodes,
exchangeId(),
- last != null ? last : cctx.versions().last());
+ last != null ? last : cctx.versions().last(),
+ partHistSuppliers,
- partsToReload,
- compress);
++ partsToReload);
if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
m.setExceptionsMap(changeGlobalStateExceptions);
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
[2/8] ignite git commit: Remove some unused parameters.
Posted by sb...@apache.org.
Remove some unused parameters.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cc13eae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cc13eae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cc13eae
Branch: refs/heads/ignite-gg-12389
Commit: 3cc13eae99b2f4f86a9c679efd31a4e87a37d4fd
Parents: bdd31af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 16:03:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 16:03:37 2017 +0300
----------------------------------------------------------------------
.../cache/GridCachePartitionExchangeManager.java | 15 ++++++---------
.../preloader/GridDhtPartitionsExchangeFuture.java | 11 ++++-------
2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc13eae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cbb07f9..2d1aca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -860,7 +860,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param nodes Nodes.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null);
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@ -883,21 +883,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param nodes Target nodes.
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
- * @param compress {@code True} if it is possible to use compression for message.
* @return Message.
*/
- public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
final @Nullable GridDhtPartitionExchangeId exchId,
- @Nullable GridCacheVersion lastVer,
- final boolean compress) {
+ @Nullable GridCacheVersion lastVer) {
final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
lastVer,
exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
- m.compress(compress);
+ m.compress(true);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
@@ -917,7 +914,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (locMap != null) {
addFullPartitionsMap(m,
dupData,
- compress,
+ true,
grp.groupId(),
locMap,
affCache.similarAffinityKey());
@@ -935,7 +932,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (map != null) {
addFullPartitionsMap(m,
dupData,
- compress,
+ true,
top.groupId(),
map,
top.similarAffinityKey());
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc13eae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3dc2242..7471855 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1083,17 +1083,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @param compress {@code True} if it is possible to use compression for message.
* @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage() {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
- nodes,
exchangeId(),
- last != null ? last : cctx.versions().last(),
- compress);
+ last != null ? last : cctx.versions().last());
if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
m.setExceptionsMap(changeGlobalStateExceptions);
@@ -1106,7 +1103,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage();
assert !nodes.contains(cctx.localNode());
@@ -1390,7 +1387,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage();
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
[5/8] ignite git commit: Reduced amount of debug logging.
Posted by sb...@apache.org.
Reduced amount of debug logging.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/211caf15
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/211caf15
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/211caf15
Branch: refs/heads/ignite-gg-12389
Commit: 211caf15fa4d2f01ac09c7de414272b8c0d8d908
Parents: ae5ec94
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 4 11:21:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 4 11:21:49 2017 +0300
----------------------------------------------------------------------
.../processors/affinity/GridAffinityAssignmentCache.java | 8 +++++++-
.../processors/cache/GridCachePartitionExchangeManager.java | 8 ++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/211caf15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 71ec3ea..a8c6c59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -431,8 +431,10 @@ public class GridAffinityAssignmentCache {
/**
* Dumps debug information.
+ *
+ * @return {@code True} if there are pending futures.
*/
- public void dumpDebugInfo() {
+ public boolean dumpDebugInfo() {
if (!readyFuts.isEmpty()) {
U.warn(log, "First 3 pending affinity ready futures [grp=" + cacheOrGrpName +
", total=" + readyFuts.size() +
@@ -446,7 +448,11 @@ public class GridAffinityAssignmentCache {
if (++cnt == 3)
break;
}
+
+ return true;
}
+
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/211caf15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2b5a4ff..f07119a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1636,6 +1636,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ int affDumpCnt = 0;
+
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
@@ -1647,8 +1649,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridAffinityAssignmentCache aff = grp.affinity();
- if (aff != null)
- aff.dumpDebugInfo();
+ if (aff != null && affDumpCnt < 5) {
+ if (aff.dumpDebugInfo())
+ affDumpCnt++;
+ }
}
}
[8/8] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-2.1' into ignite-gg-12389
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-2.1' into ignite-gg-12389
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87cb2ae6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87cb2ae6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87cb2ae6
Branch: refs/heads/ignite-gg-12389
Commit: 87cb2ae604fb722892c3383cf1b70966c0f3f16e
Parents: 5788649 15613e2
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 4 17:11:48 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 4 17:11:48 2017 +0300
----------------------------------------------------------------------
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 10 +--
.../communication/GridMessageListener.java | 3 +-
.../deployment/GridDeploymentCommunication.java | 4 +-
.../eventstorage/GridEventStorageManager.java | 4 +-
.../affinity/GridAffinityAssignmentCache.java | 8 +-
.../processors/cache/GridCacheIoManager.java | 79 +++++++++++---------
.../GridCachePartitionExchangeManager.java | 24 +++---
.../cache/binary/BinaryMetadataTransport.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 11 +--
.../cache/transactions/IgniteTxManager.java | 2 +-
.../processors/cluster/ClusterProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/job/GridJobProcessor.java | 8 +-
.../GridMarshallerMappingProcessor.java | 4 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 6 +-
.../jobstealing/JobStealingCollisionSpi.java | 2 +-
...idCommunicationManagerListenersSelfTest.java | 2 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../cache/GridCachePartitionedGetSelfTest.java | 2 +-
...lerCacheClientRequestsMappingOnMissTest.java | 6 +-
...naryObjectMetadataExchangeMultinodeTest.java | 6 +-
...DeadlockDetectionMessageMarshallingTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 12 +--
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../testframework/GridSpiTestContext.java | 5 +-
.../hadoop/shuffle/HadoopShuffle.java | 2 +-
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../Apache.Ignite.Core/Impl/IgniteUtils.cs | 12 ++-
.../generator/ConfigurationGenerator.js | 9 ++-
39 files changed, 148 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c9182ec,a1926ee..d2761cb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1269,12 -1158,10 +1268,10 @@@ public class GridDhtPartitionsExchangeF
exchangeId(),
last != null ? last : cctx.versions().last(),
partHistSuppliers,
- partsToReload,
- compress);
+ partsToReload);
- if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
- m.setExceptionsMap(changeGlobalStateExceptions);
+ if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions))
+ m.setErrorsMap(changeGlobalStateExceptions);
return m;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
[4/8] ignite git commit: IGNITE-5628 .NET: Fix jvm.dll lookup paths
for JRE
Posted by sb...@apache.org.
IGNITE-5628 .NET: Fix jvm.dll lookup paths for JRE
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5093660d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5093660d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5093660d
Branch: refs/heads/ignite-gg-12389
Commit: 5093660d758ad4149d5cd135f3cad3dfee0ae6e4
Parents: c08849c
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jul 3 17:58:47 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jul 3 17:58:47 2017 +0300
----------------------------------------------------------------------
.../dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5093660d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index b024345..f3bdd2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -46,7 +46,17 @@ namespace Apache.Ignite.Core.Impl
private const string EnvJavaHome = "JAVA_HOME";
/** Lookup paths. */
- private static readonly string[] JvmDllLookupPaths = {@"jre\bin\server", @"jre\bin\default"};
+ private static readonly string[] JvmDllLookupPaths =
+ {
+ // JRE paths
+ @"bin\server",
+ @"bin\client",
+
+ // JDK paths
+ @"jre\bin\server",
+ @"jre\bin\client",
+ @"jre\bin\default"
+ };
/** Registry lookup paths. */
private static readonly string[] JreRegistryKeys =
[6/8] ignite git commit: IGNITE-5683 Fixed missing fully qualified
class names for generated indexed types on Models screen.
Posted by sb...@apache.org.
IGNITE-5683 Fixed missing fully qualified class names for generated indexed types on Models screen.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8461977
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8461977
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8461977
Branch: refs/heads/ignite-gg-12389
Commit: d84619775e3960f30890a467b897315deed20ab7
Parents: 211caf1
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jul 4 15:44:27 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jul 4 15:44:27 2017 +0700
----------------------------------------------------------------------
.../configuration/generator/ConfigurationGenerator.js | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8461977/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js b/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
index a903ec4..f850dce 100644
--- a/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
+++ b/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
@@ -1653,8 +1653,11 @@ export default class IgniteConfigurationGenerator {
static domainModelGeneral(domain, cfg = this.domainConfigurationBean(domain)) {
switch (cfg.valueOf('queryMetadata')) {
case 'Annotations':
- if (_.nonNil(domain.keyType) && _.nonNil(domain.valueType))
- cfg.varArgProperty('indexedTypes', 'indexedTypes', [domain.keyType, domain.valueType], 'java.lang.Class');
+ if (_.nonNil(domain.keyType) && _.nonNil(domain.valueType)) {
+ cfg.varArgProperty('indexedTypes', 'indexedTypes',
+ [javaTypes.fullClassName(domain.keyType), javaTypes.fullClassName(domain.valueType)],
+ 'java.lang.Class');
+ }
break;
case 'Configuration':
@@ -1864,7 +1867,7 @@ export default class IgniteConfigurationGenerator {
static cacheQuery(cache, domains, available, ccfg = this.cacheConfigurationBean(cache)) {
const indexedTypes = _.reduce(domains, (acc, domain) => {
if (domain.queryMetadata === 'Annotations')
- acc.push(domain.keyType, domain.valueType);
+ acc.push(javaTypes.fullClassName(domain.keyType), javaTypes.fullClassName(domain.valueType));
return acc;
}, []);
[3/8] ignite git commit: IGNITE-5613 - Fixed race on local sequence
increment and distributed update
Posted by sb...@apache.org.
IGNITE-5613 - Fixed race on local sequence increment and distributed update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c08849cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c08849cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c08849cd
Branch: refs/heads/ignite-gg-12389
Commit: c08849cdb362f1c699afb5d04383fa3200193539
Parents: 3cc13eae
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jul 3 17:05:48 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 17:06:38 2017 +0300
----------------------------------------------------------------------
.../GridCacheAtomicSequenceImpl.java | 55 ++++++++++++--------
...titionedAtomicSequenceMultiThreadedTest.java | 32 ++++++++++++
2 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 31ec16f..0354a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -385,39 +385,48 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
long newUpBound;
- curLocVal = locVal;
+ // Even though we hold a transaction lock here, we must hold the local update lock here as well
+ // because we mutate multipe variables (locVal and upBound).
+ localUpdate.lock();
- // If local range was already reserved in another thread.
- if (curLocVal + l <= upBound) {
- locVal = curLocVal + l;
+ try {
+ curLocVal = locVal;
- return updated ? curLocVal + l : curLocVal;
- }
+ // If local range was already reserved in another thread.
+ if (curLocVal + l <= upBound) {
+ locVal = curLocVal + l;
- long curGlobalVal = seq.get();
+ return updated ? curLocVal + l : curLocVal;
+ }
- long newLocVal;
+ long curGlobalVal = seq.get();
- /* We should use offset because we already reserved left side of range.*/
- long off = batchSize > 1 ? batchSize - 1 : 1;
+ long newLocVal;
- // Calculate new values for local counter, global counter and upper bound.
- if (curLocVal + l >= curGlobalVal) {
- newLocVal = curLocVal + l;
+ /* We should use offset because we already reserved left side of range.*/
+ long off = batchSize > 1 ? batchSize - 1 : 1;
- newUpBound = newLocVal + off;
- }
- else {
- newLocVal = curGlobalVal;
+ // Calculate new values for local counter, global counter and upper bound.
+ if (curLocVal + l >= curGlobalVal) {
+ newLocVal = curLocVal + l;
- newUpBound = newLocVal + off;
- }
+ newUpBound = newLocVal + off;
+ }
+ else {
+ newLocVal = curGlobalVal;
- locVal = newLocVal;
- upBound = newUpBound;
+ newUpBound = newLocVal + off;
+ }
- if (updated)
- curLocVal = newLocVal;
+ locVal = newLocVal;
+ upBound = newUpBound;
+
+ if (updated)
+ curLocVal = newLocVal;
+ }
+ finally {
+ localUpdate.unlock();
+ }
// Global counter must be more than reserved upper bound.
seq.set(newUpBound + 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
index 945650d..4db9bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
@@ -26,6 +27,7 @@ import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -281,6 +283,36 @@ public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteA
}
/**
+ * @throws Exception if failed.
+ */
+ public void testMultipleSequences() throws Exception {
+ final int seqCnt = 5;
+ final int threadCnt = 5;
+ final int incCnt = 1_000;
+
+ final IgniteAtomicSequence[] seqs = new IgniteAtomicSequence[seqCnt];
+
+ String seqName = UUID.randomUUID().toString();
+
+ for (int i = 0; i < seqs.length; i++)
+ seqs[i] = grid(0).atomicSequence(seqName, 0, true);
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < incCnt; i++) {
+ for (IgniteAtomicSequence seq : seqs)
+ seq.incrementAndGet();
+ }
+
+ return null;
+ }
+ }, threadCnt, "load");
+
+ for (IgniteAtomicSequence seq : seqs)
+ assertEquals(seqCnt * threadCnt * incCnt, seq.get());
+ }
+
+ /**
* Executes given closure in a given number of threads given number of times.
*
* @param c Closure to execute.