You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/04 14:14:05 UTC

[1/8] ignite git commit: Pass io policy in GridMessageListener.

Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-12389 578864957 -> 87cb2ae60


Pass io policy in GridMessageListener.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdd31af7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdd31af7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdd31af7

Branch: refs/heads/ignite-gg-12389
Commit: bdd31af760f745c9ab05f75d100649a30dfe4f1e
Parents: 7db925c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 15:45:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 15:45:04 2017 +0300

----------------------------------------------------------------------
 .../checkpoint/GridCheckpointManager.java       |  2 +-
 .../managers/communication/GridIoManager.java   | 10 +--
 .../communication/GridMessageListener.java      |  3 +-
 .../deployment/GridDeploymentCommunication.java |  4 +-
 .../eventstorage/GridEventStorageManager.java   |  4 +-
 .../processors/cache/GridCacheIoManager.java    | 77 +++++++++++---------
 .../cache/binary/BinaryMetadataTransport.java   |  4 +-
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 .../processors/cluster/ClusterProcessor.java    |  2 +-
 .../continuous/GridContinuousProcessor.java     |  4 +-
 .../datastreamer/DataStreamProcessor.java       |  2 +-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../processors/igfs/IgfsDataManager.java        |  2 +-
 .../igfs/IgfsFragmentizerManager.java           |  4 +-
 .../processors/job/GridJobProcessor.java        |  8 +-
 .../GridMarshallerMappingProcessor.java         |  4 +-
 .../processors/query/GridQueryProcessor.java    |  2 +-
 .../handlers/task/GridTaskCommandHandler.java   |  4 +-
 .../processors/task/GridTaskProcessor.java      |  6 +-
 .../jobstealing/JobStealingCollisionSpi.java    |  2 +-
 ...idCommunicationManagerListenersSelfTest.java |  2 +-
 .../GridCommunicationSendMessageSelfTest.java   |  2 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |  2 +-
 ...lerCacheClientRequestsMappingOnMissTest.java |  6 +-
 ...naryObjectMetadataExchangeMultinodeTest.java |  6 +-
 ...DeadlockDetectionMessageMarshallingTest.java |  2 +-
 .../communication/GridIoManagerBenchmark.java   |  4 +-
 .../communication/GridIoManagerBenchmark0.java  | 12 +--
 .../communication/GridCacheMessageSelfTest.java |  2 +-
 .../testframework/GridSpiTestContext.java       |  5 +-
 .../hadoop/shuffle/HadoopShuffle.java           |  2 +-
 .../query/h2/opt/GridH2IndexBase.java           |  2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  2 +-
 34 files changed, 106 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 782ee5e..aae1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -464,7 +464,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
          * @param msg Received message.
          */
         @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCheckpointRequest req = (GridCheckpointRequest)msg;
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a1ddaf4..3ff44a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -337,7 +337,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             log.debug(startInfo());
 
         addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 ClusterNode node = ctx.discovery().node(nodeId);
 
                 if (node == null)
@@ -1553,7 +1553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             CUR_PLC.set(plc);
 
         try {
-            lsnr.onMessage(nodeId, msg);
+            lsnr.onMessage(nodeId, msg, plc);
         }
         finally {
             if (change)
@@ -2323,14 +2323,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
          * @param nodeId Node ID.
          * @param msg Message.
          */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridMessageListener[] arr0 = arr;
 
             if (arr0 == null)
                 return;
 
             for (GridMessageListener l : arr0)
-                l.onMessage(nodeId, msg);
+                l.onMessage(nodeId, msg, plc);
         }
 
         /**
@@ -2430,7 +2430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         /** {@inheritDoc} */
         @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridIoUserMessage)) {
                 U.error(log, "Received unknown message (potentially fatal problem): " + msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
index 3993591..c7de57c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
@@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener {
      * @param nodeId ID of node that sent the message. Note that may have already
      *      left topology by the time this message is received.
      * @param msg Message received.
+     * @param plc Message policy (pool).
      */
-    public void onMessage(UUID nodeId, Object msg);
+    public void onMessage(UUID nodeId, Object msg, byte plc);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 23d186a..2a5f7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -83,7 +83,7 @@ class GridDeploymentCommunication {
         this.log = log.getLogger(getClass());
 
         peerLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 processDeploymentRequest(nodeId, msg);
             }
         };
@@ -422,7 +422,7 @@ class GridDeploymentCommunication {
         };
 
         GridMessageListener resLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index bd43e43..7836004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1008,7 +1008,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
 
         GridMessageListener resLsnr = new GridMessageListener() {
             @SuppressWarnings("deprecation")
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 
@@ -1185,7 +1185,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      */
     private class RequestListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a920bd0..49cfcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -155,7 +155,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
-        @Override public void onMessage(final UUID nodeId, final Object msg) {
+        @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received unordered cache communication message [nodeId=" + nodeId +
                     ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
@@ -196,7 +196,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                             @Override public void apply(IgniteInternalFuture<?> fut) {
                                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                                     @Override public void run() {
-                                        handleMessage(nodeId, cacheMsg);
+                                        handleMessage(nodeId, cacheMsg, plc);
                                     }
                                 });
                             }
@@ -269,40 +269,48 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                                     log.debug(msg0.toString());
                                 }
 
-                                handleMessage(nodeId, cacheMsg);
+                                handleMessage(nodeId, cacheMsg, plc);
                             }
                         };
 
                         if (stripe >= 0)
                             cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
-                        else
-                            cctx.kernalContext().closure().runLocalSafe(c);
+                        else {
+                            try {
+                                cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
+                            }
+                        }
                     }
                 });
 
                 return;
             }
 
-            handleMessage(nodeId, cacheMsg);
+            handleMessage(nodeId, cacheMsg, plc);
         }
     };
 
     /**
      * @param nodeId Sender node ID.
      * @param cacheMsg Message.
+     * @param plc Message policy.
      */
     @SuppressWarnings("unchecked")
-    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
-        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers);
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
+        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc);
     }
 
     /**
      * @param nodeId Sender node ID.
      * @param cacheMsg Message.
      * @param msgHandlers Message handlers.
+     * @param plc Message policy.
      */
     @SuppressWarnings("unchecked")
-    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
         Lock lock = rw.readLock();
 
         lock.lock();
@@ -356,7 +364,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 return;
             }
 
-            onMessage0(nodeId, cacheMsg, c);
+            onMessage0(nodeId, cacheMsg, c, plc);
         }
         finally {
             lock.unlock();
@@ -517,10 +525,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param nodeId Node ID.
      * @param cacheMsg Cache message.
      * @param c Handler closure.
+     * @param plc Message policy.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
-        final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+        final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
         try {
             if (stopping) {
                 if (log.isDebugEnabled())
@@ -536,7 +545,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             unmarshall(nodeId, cacheMsg);
 
             if (cacheMsg.classError() != null)
-                processFailedMessage(nodeId, cacheMsg, c);
+                processFailedMessage(nodeId, cacheMsg, c, plc);
             else
                 processMessage(nodeId, cacheMsg, c);
         }
@@ -669,15 +678,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      *
      * @param nodeId Node ID.
      * @param msg Message.
+     * @param c Closure.
+     * @param plc Message policy.
      * @throws IgniteCheckedException If failed.
      */
-    private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
+    private void processFailedMessage(UUID nodeId,
+        GridCacheMessage msg,
+        IgniteBiInClosure<UUID, GridCacheMessage> c,
+        byte plc)
         throws IgniteCheckedException {
         assert msg != null;
 
-        GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
-            cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
-
         switch (msg.directType()) {
             case 30: {
                 GridDhtLockRequest req = (GridDhtLockRequest)msg;
@@ -688,9 +699,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.futureId(),
                     req.miniId(),
                     0,
-                    ctx.deploymentEnabled());
+                    false);
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -723,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
 
                 if (req.nearNodeId() != null) {
                     GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -734,7 +745,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     nearRes.errors(new UpdateErrors(req.classError()));
 
-                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
                 }
             }
 
@@ -753,7 +764,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -770,7 +781,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -793,7 +804,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -831,7 +842,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     null,
                     false);
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -872,7 +883,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     cctx.node(nodeId),
                     TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
                     res,
-                    ctx.ioPolicy(),
+                    plc,
                     Long.MAX_VALUE);
             }
 
@@ -897,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -935,7 +946,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -953,7 +964,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -971,7 +982,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -987,7 +998,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
 
                 if (req.nearNodeId() != null) {
                     GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -998,7 +1009,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     nearRes.errors(new UpdateErrors(req.classError()));
 
-                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
                 }
             }
 
@@ -1540,7 +1551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         /** {@inheritDoc} */
         @SuppressWarnings({"CatchGenericClass", "unchecked"})
-        @Override public void onMessage(final UUID nodeId, Object msg) {
+        @Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
 
@@ -1551,7 +1562,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             try {
                 GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
-                onMessage0(nodeId, cacheMsg, c);
+                onMessage0(nodeId, cacheMsg, c, plc);
             }
             finally {
                 lock.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 00c760f..5fd7295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -561,7 +561,7 @@ final class BinaryMetadataTransport {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataRequestMessage : msg;
 
             MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
@@ -606,7 +606,7 @@ final class BinaryMetadataTransport {
     private final class MetadataResponseListener implements GridMessageListener {
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataResponseMessage : msg;
 
             MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a9aa13d..3a3b766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2433,7 +2433,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private class DeadlockDetectionListener implements GridMessageListener {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
             Throwable err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index ed83650..0bd2370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -146,7 +146,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof IgniteDiagnosticMessage) {
                     IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f641399..b1d3442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -289,7 +289,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object obj) {
+            @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                 GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                 if (msg.data() == null && msg.dataBytes() != null) {
@@ -771,7 +771,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     private void registerMessageListener(GridContinuousHandler hnd) {
         if (hnd.orderedTopic() != null) {
             ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
+                @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                     GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                     // Only notification can be ordered.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c52f7ac..31ae1e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -82,7 +82,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
         if (!ctx.clientNode()) {
             ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     assert msg instanceof DataStreamerRequest;
 
                     processRequest(nodeId, (DataStreamerRequest)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 40988d3..ae441de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -319,7 +319,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
 
         ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert msg instanceof DataStreamerResponse;
 
                 DataStreamerResponse res = (DataStreamerResponse)msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 8ae6db8..a4ea337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -158,7 +158,7 @@ public class IgfsDataManager extends IgfsManager {
         topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
 
         igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof IgfsBlocksMessage)
                     processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
                 else if (msg instanceof IgfsAckMessage)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 7797f89..0c75bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerResponse) {
                 IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
 
@@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerRequest ||
                 msg instanceof IgfsSyncMessage) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 408396a..5ae4da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -447,7 +447,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridJobSiblingsResponse res = null;
 
@@ -1856,7 +1856,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobSessionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1872,7 +1872,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobCancelListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1890,7 +1890,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobExecutionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 8de6c49..2543042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -168,7 +168,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MissingMappingRequestMessage : msg;
 
             MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
@@ -200,7 +200,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
      */
     private final class MissingMappingResponseListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MissingMappingResponseMessage : msg;
 
             MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index dd07584..d55a129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -207,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         valCtx = new CacheQueryObjectValueContext(ctx);
 
         ioLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof SchemaOperationStatusMessage) {
                     SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 99ba335..d9b49cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         super(ctx);
 
         ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!(msg instanceof GridTaskResultRequest)) {
                     U.warn(log, "Received unexpected message instead of task result request: " + msg);
 
@@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridTaskResultResponse res = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6ae97dd..d7a022e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1282,7 +1282,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof GridJobExecuteResponse)
                 processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
             else if (jobResOnly)
@@ -1326,7 +1326,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class JobSiblingsMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridJobSiblingsRequest)) {
                 U.warn(log, "Received unexpected message instead of siblings request: " + msg);
 
@@ -1398,7 +1398,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class TaskCancelMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg != null;
 
             if (!(msg instanceof GridTaskCancelRequest)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 6f2c099..39a8e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -648,7 +648,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
 
         spiCtx.addMessageListener(
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     MessageInfo info = rcvMsgMap.get(nodeId);
 
                     if (info == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
index 1738da8..03b7921 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
@@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             // No-op.
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 29b7847..3563c77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -143,7 +143,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msgCls.isInstance(msg))
                     latch.countDown();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index 308f2b4..d5988f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -224,7 +224,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
                 ((IgniteKernal)g).context().io().addMessageListener(
                     TOPIC_CACHE,
                     new GridMessageListener() {
-                        @Override public void onMessage(UUID nodeId, Object msg) {
+                        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                             info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
                             if (msg instanceof GridNearSingleGetRequest) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
index f5f2512..057b970 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
@@ -300,10 +300,10 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
         final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()];
 
         GridMessageListener wrapper = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 mappingReqsCounter.incrementAndGet();
 
-                delegate.onMessage(nodeId, msg);
+                delegate.onMessage(nodeId, msg, plc);
             }
         };
 
@@ -321,7 +321,7 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
         ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH);
 
         ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 new Thread(new Runnable() {
                     @Override public void run() {
                         mappingReqsCounter.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
index 9370e27..6ff703e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -396,7 +396,7 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
         ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
 
         ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 new Thread(new Runnable() {
                     @Override public void run() {
                         metadataReqsCounter.incrementAndGet();
@@ -416,9 +416,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
         final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
 
         GridMessageListener wrapper = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 metadataReqsCounter.incrementAndGet();
-                delegate.onMessage(nodeId, msg);
+                delegate.onMessage(nodeId, msg, plc);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
index 9126053..1a48cec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
@@ -76,7 +76,7 @@ public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstrac
             final AtomicBoolean res = new AtomicBoolean();
 
             clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     if (msg instanceof TxLocksResponse) {
                         try {
                             ((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader());

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 03bbb00..5671158 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -240,7 +240,7 @@ public class GridIoManagerBenchmark {
         GridMessageListener lsnr = new GridMessageListener() {
             private ClusterNode node;
 
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (node == null)
                     node = g.context().discovery().node(nodeId);
 
@@ -336,7 +336,7 @@ public class GridIoManagerBenchmark {
      */
     private static class SenderMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             msgCntr.increment();
 
             if (testLatency)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 0f0332f..7b1d972 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();
@@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 map.get(((GridTestMessage)msg).id()).countDown();
             }
         });
@@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 4a6b765..435af8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 try {
                     latch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7d3c5d6..93cd911 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -328,7 +329,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     @SuppressWarnings("deprecation")
     public void triggerMessage(ClusterNode node, Object msg) {
         for (GridMessageListener lsnr : msgLsnrs)
-            lsnr.onMessage(node.id(), msg);
+            lsnr.onMessage(node.id(), msg, GridIoPolicy.SYSTEM_POOL);
     }
 
     /** {@inheritDoc} */
@@ -667,7 +668,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
         @SuppressWarnings({
             "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
 
             Object msgBody = ioMsg.body();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 3296993..cd1c93c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -63,7 +63,7 @@ public class HadoopShuffle extends HadoopComponent {
         super.start(ctx);
 
         ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 onMessageReceived(nodeId, (HadoopMessage)msg);
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 3dabc58..542adf0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -135,7 +135,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
 
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     GridSpinBusyLock l = desc.indexing().busyLock();
 
                     if (!l.enterBusy())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6b7ba75..fcf5f10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -172,7 +172,7 @@ public class GridMapQueryExecutor {
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b85fa61..85a7e0b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -169,7 +169,7 @@ public class GridReduceQueryExecutor {
         log = ctx.log(GridReduceQueryExecutor.class);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 


[7/8] ignite git commit: Merge master into ignite-2.1.2

Posted by sb...@apache.org.
Merge master into ignite-2.1.2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15613e2a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15613e2a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15613e2a

Branch: refs/heads/ignite-gg-12389
Commit: 15613e2af5e0a4a0014bb5c6d6f6915038b1be1a
Parents: d846197 5093660
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Jul 4 12:39:38 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jul 4 12:39:38 2017 +0300

----------------------------------------------------------------------
 .../checkpoint/GridCheckpointManager.java       |  2 +-
 .../managers/communication/GridIoManager.java   | 10 +--
 .../communication/GridMessageListener.java      |  3 +-
 .../deployment/GridDeploymentCommunication.java |  4 +-
 .../eventstorage/GridEventStorageManager.java   |  4 +-
 .../processors/cache/GridCacheIoManager.java    | 85 +++++++++++---------
 .../GridCachePartitionExchangeManager.java      | 16 ++--
 .../cache/binary/BinaryMetadataTransport.java   |  4 +-
 .../GridDhtPartitionsExchangeFuture.java        | 11 +--
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 .../processors/cluster/ClusterProcessor.java    |  2 +-
 .../continuous/GridContinuousProcessor.java     |  4 +-
 .../datastreamer/DataStreamProcessor.java       |  2 +-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../processors/igfs/IgfsDataManager.java        |  2 +-
 .../igfs/IgfsFragmentizerManager.java           |  4 +-
 .../processors/job/GridJobProcessor.java        |  8 +-
 .../GridMarshallerMappingProcessor.java         |  4 +-
 .../processors/query/GridQueryProcessor.java    |  2 +-
 .../handlers/task/GridTaskCommandHandler.java   |  4 +-
 .../processors/task/GridTaskProcessor.java      |  6 +-
 .../jobstealing/JobStealingCollisionSpi.java    |  2 +-
 ...idCommunicationManagerListenersSelfTest.java |  2 +-
 .../GridCommunicationSendMessageSelfTest.java   |  2 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |  2 +-
 ...lerCacheClientRequestsMappingOnMissTest.java |  6 +-
 ...naryObjectMetadataExchangeMultinodeTest.java |  6 +-
 ...DeadlockDetectionMessageMarshallingTest.java |  2 +-
 .../communication/GridIoManagerBenchmark.java   |  4 +-
 .../communication/GridIoManagerBenchmark0.java  | 12 +--
 .../communication/GridCacheMessageSelfTest.java |  2 +-
 .../testframework/GridSpiTestContext.java       |  5 +-
 .../hadoop/shuffle/HadoopShuffle.java           |  2 +-
 .../query/h2/opt/GridH2IndexBase.java           |  2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  2 +-
 .../Apache.Ignite.Core/Impl/IgniteUtils.cs      | 12 ++-
 37 files changed, 129 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 44eedb1,49cfcdd..2de3808
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@@ -19,7 -19,7 +19,6 @@@ package org.apache.ignite.internal.proc
  
  import java.util.ArrayList;
  import java.util.Arrays;
--import java.util.Collection;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.List;
@@@ -80,17 -80,17 +79,12 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
  import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
--import org.apache.ignite.internal.util.F0;
--import org.apache.ignite.internal.util.GridLeanSet;
  import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
  import org.apache.ignite.internal.util.typedef.CI1;
--import org.apache.ignite.internal.util.typedef.F;
--import org.apache.ignite.internal.util.typedef.P1;
  import org.apache.ignite.internal.util.typedef.X;
  import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.apache.ignite.lang.IgniteBiInClosure;
--import org.apache.ignite.lang.IgnitePredicate;
  import org.apache.ignite.lang.IgniteUuid;
  import org.apache.ignite.thread.IgniteThread;
  import org.jetbrains.annotations.Nullable;
@@@ -355,19 -358,9 +357,19 @@@ public class GridCacheIoManager extend
                      if (log.isDebugEnabled())
                          log.debug(msg0.toString());
                  }
 -                else
 +                else {
                      U.error(log, msg0.toString());
  
 +                    try {
 +                        cacheMsg.onClassError(new IgniteCheckedException("Failed to find message handler for message: " + cacheMsg));
 +
-                         processFailedMessage(nodeId, cacheMsg, c);
++                        processFailedMessage(nodeId, cacheMsg, c, plc);
 +                    }
 +                    catch (Exception e) {
 +                        U.error(log, "Failed to process failed message: " + e, e);
 +                    }
 +                }
 +
                  return;
              }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f07119a,2d1aca0..93310e3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -863,7 -860,7 +863,7 @@@ public class GridCachePartitionExchange
       * @param nodes Nodes.
       */
      private void sendAllPartitions(Collection<ClusterNode> nodes) {
-         GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true);
 -        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null);
++        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null, null, null);
  
          if (log.isDebugEnabled())
              log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@@ -886,26 -883,18 +886,24 @@@
      }
  
      /**
-      * @param nodes Target nodes.
       * @param exchId Non-null exchange ID if message is created for exchange.
       * @param lastVer Last version.
-      * @param compress {@code True} if it is possible to use compression for message.
       * @return Message.
       */
-     public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
 -        final @Nullable GridDhtPartitionExchangeId exchId,
 -        @Nullable GridCacheVersion lastVer) {
 +        @Nullable final GridDhtPartitionExchangeId exchId,
 +        @Nullable GridCacheVersion lastVer,
 +        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
-         @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
-         final boolean compress) {
++        @Nullable IgniteDhtPartitionsToReloadMap partsToReload
++    ) {
          final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
              lastVer,
 -            exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
 +            exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
 +            partHistSuppliers,
 +            partsToReload
 +            );
  
-         m.compress(compress);
+         m.compress(true);
  
          final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 42f60b1,7471855..a1926ee
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1156,12 -1089,8 +1155,10 @@@ public class GridDhtPartitionsExchangeF
          GridCacheVersion last = lastVer.get();
  
          GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
-             nodes,
              exchangeId(),
 -            last != null ? last : cctx.versions().last());
 +            last != null ? last : cctx.versions().last(),
 +            partHistSuppliers,
-             partsToReload,
-             compress);
++            partsToReload);
  
          if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
              m.setExceptionsMap(changeGlobalStateExceptions);

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/15613e2a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------


[2/8] ignite git commit: Remove some unused parameters.

Posted by sb...@apache.org.
Remove some unused parameters.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cc13eae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cc13eae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cc13eae

Branch: refs/heads/ignite-gg-12389
Commit: 3cc13eae99b2f4f86a9c679efd31a4e87a37d4fd
Parents: bdd31af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 3 16:03:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 3 16:03:37 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCachePartitionExchangeManager.java     | 15 ++++++---------
 .../preloader/GridDhtPartitionsExchangeFuture.java   | 11 ++++-------
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc13eae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cbb07f9..2d1aca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -860,7 +860,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param nodes Nodes.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, null);
 
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@ -883,21 +883,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param nodes Target nodes.
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
-     * @param compress {@code True} if it is possible to use compression for message.
      * @return Message.
      */
-    public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+    public GridDhtPartitionsFullMessage createPartitionsFullMessage(
         final @Nullable GridDhtPartitionExchangeId exchId,
-        @Nullable GridCacheVersion lastVer,
-        final boolean compress) {
+        @Nullable GridCacheVersion lastVer) {
         final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
             lastVer,
             exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
 
-        m.compress(compress);
+        m.compress(true);
 
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
@@ -917,7 +914,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (locMap != null) {
                     addFullPartitionsMap(m,
                         dupData,
-                        compress,
+                        true,
                         grp.groupId(),
                         locMap,
                         affCache.similarAffinityKey());
@@ -935,7 +932,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (map != null) {
                 addFullPartitionsMap(m,
                     dupData,
-                    compress,
+                    true,
                     top.groupId(),
                     map,
                     top.similarAffinityKey());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc13eae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3dc2242..7471855 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1083,17 +1083,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param compress {@code True} if it is possible to use compression for message.
      * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage() {
         GridCacheVersion last = lastVer.get();
 
         GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
-            nodes,
             exchangeId(),
-            last != null ? last : cctx.versions().last(),
-            compress);
+            last != null ? last : cctx.versions().last());
 
         if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
             m.setExceptionsMap(changeGlobalStateExceptions);
@@ -1106,7 +1103,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
+        GridDhtPartitionsFullMessage m = createPartitionsMessage();
 
         assert !nodes.contains(cctx.localNode());
 
@@ -1390,7 +1387,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage();
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 


[5/8] ignite git commit: Reduced amount of debug logging.

Posted by sb...@apache.org.
Reduced amount of debug logging.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/211caf15
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/211caf15
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/211caf15

Branch: refs/heads/ignite-gg-12389
Commit: 211caf15fa4d2f01ac09c7de414272b8c0d8d908
Parents: ae5ec94
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 4 11:21:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 4 11:21:49 2017 +0300

----------------------------------------------------------------------
 .../processors/affinity/GridAffinityAssignmentCache.java     | 8 +++++++-
 .../processors/cache/GridCachePartitionExchangeManager.java  | 8 ++++++--
 2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/211caf15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 71ec3ea..a8c6c59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -431,8 +431,10 @@ public class GridAffinityAssignmentCache {
 
     /**
      * Dumps debug information.
+     *
+     * @return {@code True} if there are pending futures.
      */
-    public void dumpDebugInfo() {
+    public boolean dumpDebugInfo() {
         if (!readyFuts.isEmpty()) {
             U.warn(log, "First 3 pending affinity ready futures [grp=" + cacheOrGrpName +
                 ", total=" + readyFuts.size() +
@@ -446,7 +448,11 @@ public class GridAffinityAssignmentCache {
                 if (++cnt == 3)
                     break;
             }
+
+            return true;
         }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/211caf15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2b5a4ff..f07119a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1636,6 +1636,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
+        int affDumpCnt = 0;
+
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
@@ -1647,8 +1649,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             GridAffinityAssignmentCache aff = grp.affinity();
 
-            if (aff != null)
-                aff.dumpDebugInfo();
+            if (aff != null && affDumpCnt < 5) {
+                if (aff.dumpDebugInfo())
+                    affDumpCnt++;
+            }
         }
     }
 


[8/8] ignite git commit: Merge remote-tracking branch 'remotes/community/ignite-2.1' into ignite-gg-12389

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-2.1' into ignite-gg-12389


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87cb2ae6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87cb2ae6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87cb2ae6

Branch: refs/heads/ignite-gg-12389
Commit: 87cb2ae604fb722892c3383cf1b70966c0f3f16e
Parents: 5788649 15613e2
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 4 17:11:48 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 4 17:11:48 2017 +0300

----------------------------------------------------------------------
 .../checkpoint/GridCheckpointManager.java       |  2 +-
 .../managers/communication/GridIoManager.java   | 10 +--
 .../communication/GridMessageListener.java      |  3 +-
 .../deployment/GridDeploymentCommunication.java |  4 +-
 .../eventstorage/GridEventStorageManager.java   |  4 +-
 .../affinity/GridAffinityAssignmentCache.java   |  8 +-
 .../processors/cache/GridCacheIoManager.java    | 79 +++++++++++---------
 .../GridCachePartitionExchangeManager.java      | 24 +++---
 .../cache/binary/BinaryMetadataTransport.java   |  4 +-
 .../GridDhtPartitionsExchangeFuture.java        | 11 +--
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 .../processors/cluster/ClusterProcessor.java    |  2 +-
 .../continuous/GridContinuousProcessor.java     |  4 +-
 .../datastreamer/DataStreamProcessor.java       |  2 +-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../processors/igfs/IgfsDataManager.java        |  2 +-
 .../igfs/IgfsFragmentizerManager.java           |  4 +-
 .../processors/job/GridJobProcessor.java        |  8 +-
 .../GridMarshallerMappingProcessor.java         |  4 +-
 .../processors/query/GridQueryProcessor.java    |  2 +-
 .../handlers/task/GridTaskCommandHandler.java   |  4 +-
 .../processors/task/GridTaskProcessor.java      |  6 +-
 .../jobstealing/JobStealingCollisionSpi.java    |  2 +-
 ...idCommunicationManagerListenersSelfTest.java |  2 +-
 .../GridCommunicationSendMessageSelfTest.java   |  2 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |  2 +-
 ...lerCacheClientRequestsMappingOnMissTest.java |  6 +-
 ...naryObjectMetadataExchangeMultinodeTest.java |  6 +-
 ...DeadlockDetectionMessageMarshallingTest.java |  2 +-
 .../communication/GridIoManagerBenchmark.java   |  4 +-
 .../communication/GridIoManagerBenchmark0.java  | 12 +--
 .../communication/GridCacheMessageSelfTest.java |  2 +-
 .../testframework/GridSpiTestContext.java       |  5 +-
 .../hadoop/shuffle/HadoopShuffle.java           |  2 +-
 .../query/h2/opt/GridH2IndexBase.java           |  2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  2 +-
 .../Apache.Ignite.Core/Impl/IgniteUtils.cs      | 12 ++-
 .../generator/ConfigurationGenerator.js         |  9 ++-
 39 files changed, 148 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c9182ec,a1926ee..d2761cb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1269,12 -1158,10 +1268,10 @@@ public class GridDhtPartitionsExchangeF
              exchangeId(),
              last != null ? last : cctx.versions().last(),
              partHistSuppliers,
-             partsToReload,
-             compress);
+             partsToReload);
  
 -        if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
 -            m.setExceptionsMap(changeGlobalStateExceptions);
 +        if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions))
 +            m.setErrorsMap(changeGlobalStateExceptions);
  
          return m;
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/87cb2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------


[4/8] ignite git commit: IGNITE-5628 .NET: Fix jvm.dll lookup paths for JRE

Posted by sb...@apache.org.
IGNITE-5628 .NET: Fix jvm.dll lookup paths for JRE


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5093660d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5093660d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5093660d

Branch: refs/heads/ignite-gg-12389
Commit: 5093660d758ad4149d5cd135f3cad3dfee0ae6e4
Parents: c08849c
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jul 3 17:58:47 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jul 3 17:58:47 2017 +0300

----------------------------------------------------------------------
 .../dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs       | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5093660d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index b024345..f3bdd2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -46,7 +46,17 @@ namespace Apache.Ignite.Core.Impl
         private const string EnvJavaHome = "JAVA_HOME";
 
         /** Lookup paths. */
-        private static readonly string[] JvmDllLookupPaths = {@"jre\bin\server", @"jre\bin\default"};
+        private static readonly string[] JvmDllLookupPaths =
+        {
+            // JRE paths
+            @"bin\server",
+            @"bin\client",
+
+            // JDK paths
+            @"jre\bin\server",
+            @"jre\bin\client",
+            @"jre\bin\default"
+        };
 
         /** Registry lookup paths. */
         private static readonly string[] JreRegistryKeys =


[6/8] ignite git commit: IGNITE-5683 Fixed missing fully qualified class names for generated indexed types on Models screen.

Posted by sb...@apache.org.
IGNITE-5683 Fixed missing fully qualified class names for generated indexed types on Models screen.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8461977
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8461977
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8461977

Branch: refs/heads/ignite-gg-12389
Commit: d84619775e3960f30890a467b897315deed20ab7
Parents: 211caf1
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jul 4 15:44:27 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jul 4 15:44:27 2017 +0700

----------------------------------------------------------------------
 .../configuration/generator/ConfigurationGenerator.js       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8461977/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js b/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
index a903ec4..f850dce 100644
--- a/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
+++ b/modules/web-console/frontend/app/modules/configuration/generator/ConfigurationGenerator.js
@@ -1653,8 +1653,11 @@ export default class IgniteConfigurationGenerator {
     static domainModelGeneral(domain, cfg = this.domainConfigurationBean(domain)) {
         switch (cfg.valueOf('queryMetadata')) {
             case 'Annotations':
-                if (_.nonNil(domain.keyType) && _.nonNil(domain.valueType))
-                    cfg.varArgProperty('indexedTypes', 'indexedTypes', [domain.keyType, domain.valueType], 'java.lang.Class');
+                if (_.nonNil(domain.keyType) && _.nonNil(domain.valueType)) {
+                    cfg.varArgProperty('indexedTypes', 'indexedTypes',
+                        [javaTypes.fullClassName(domain.keyType), javaTypes.fullClassName(domain.valueType)],
+                        'java.lang.Class');
+                }
 
                 break;
             case 'Configuration':
@@ -1864,7 +1867,7 @@ export default class IgniteConfigurationGenerator {
     static cacheQuery(cache, domains, available, ccfg = this.cacheConfigurationBean(cache)) {
         const indexedTypes = _.reduce(domains, (acc, domain) => {
             if (domain.queryMetadata === 'Annotations')
-                acc.push(domain.keyType, domain.valueType);
+                acc.push(javaTypes.fullClassName(domain.keyType), javaTypes.fullClassName(domain.valueType));
 
             return acc;
         }, []);


[3/8] ignite git commit: IGNITE-5613 - Fixed race on local sequence increment and distributed update

Posted by sb...@apache.org.
IGNITE-5613 - Fixed race on local sequence increment and distributed update


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c08849cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c08849cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c08849cd

Branch: refs/heads/ignite-gg-12389
Commit: c08849cdb362f1c699afb5d04383fa3200193539
Parents: 3cc13eae
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jul 3 17:05:48 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 17:06:38 2017 +0300

----------------------------------------------------------------------
 .../GridCacheAtomicSequenceImpl.java            | 55 ++++++++++++--------
 ...titionedAtomicSequenceMultiThreadedTest.java | 32 ++++++++++++
 2 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 31ec16f..0354a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -385,39 +385,48 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
                     long newUpBound;
 
-                    curLocVal = locVal;
+                    // Even though we hold a transaction lock here, we must hold the local update lock here as well
+                    // because we mutate multipe variables (locVal and upBound).
+                    localUpdate.lock();
 
-                    // If local range was already reserved in another thread.
-                    if (curLocVal + l <= upBound) {
-                        locVal = curLocVal + l;
+                    try {
+                        curLocVal = locVal;
 
-                        return updated ? curLocVal + l : curLocVal;
-                    }
+                        // If local range was already reserved in another thread.
+                        if (curLocVal + l <= upBound) {
+                            locVal = curLocVal + l;
 
-                    long curGlobalVal = seq.get();
+                            return updated ? curLocVal + l : curLocVal;
+                        }
 
-                    long newLocVal;
+                        long curGlobalVal = seq.get();
 
-                    /* We should use offset because we already reserved left side of range.*/
-                    long off = batchSize > 1 ? batchSize - 1 : 1;
+                        long newLocVal;
 
-                    // Calculate new values for local counter, global counter and upper bound.
-                    if (curLocVal + l >= curGlobalVal) {
-                        newLocVal = curLocVal + l;
+                        /* We should use offset because we already reserved left side of range.*/
+                        long off = batchSize > 1 ? batchSize - 1 : 1;
 
-                        newUpBound = newLocVal + off;
-                    }
-                    else {
-                        newLocVal = curGlobalVal;
+                        // Calculate new values for local counter, global counter and upper bound.
+                        if (curLocVal + l >= curGlobalVal) {
+                            newLocVal = curLocVal + l;
 
-                        newUpBound = newLocVal + off;
-                    }
+                            newUpBound = newLocVal + off;
+                        }
+                        else {
+                            newLocVal = curGlobalVal;
 
-                    locVal = newLocVal;
-                    upBound = newUpBound;
+                            newUpBound = newLocVal + off;
+                        }
 
-                    if (updated)
-                        curLocVal = newLocVal;
+                        locVal = newLocVal;
+                        upBound = newUpBound;
+
+                        if (updated)
+                            curLocVal = newLocVal;
+                    }
+                    finally {
+                        localUpdate.unlock();
+                    }
 
                     // Global counter must be more than reserved upper bound.
                     seq.set(newUpBound + 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c08849cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
index 945650d..4db9bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
 
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
@@ -26,6 +27,7 @@ import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest;
 import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
@@ -281,6 +283,36 @@ public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteA
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    public void testMultipleSequences() throws Exception {
+        final int seqCnt = 5;
+        final int threadCnt = 5;
+        final int incCnt = 1_000;
+
+        final IgniteAtomicSequence[] seqs = new IgniteAtomicSequence[seqCnt];
+
+        String seqName = UUID.randomUUID().toString();
+
+        for (int i = 0; i < seqs.length; i++)
+            seqs[i] = grid(0).atomicSequence(seqName, 0, true);
+
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < incCnt; i++) {
+                    for (IgniteAtomicSequence seq : seqs)
+                        seq.incrementAndGet();
+                }
+
+                return null;
+            }
+        }, threadCnt, "load");
+
+        for (IgniteAtomicSequence seq : seqs)
+            assertEquals(seqCnt * threadCnt * incCnt, seq.get());
+    }
+
+    /**
      * Executes given closure in a given number of threads given number of times.
      *
      * @param c Closure to execute.