You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/03/22 18:36:43 UTC

[1/7] ignite git commit: ignite-2797 Incorrect timeout calculation for optimistic transactions fixed

Repository: ignite
Updated Branches:
  refs/heads/ignite-2801 863a6c557 -> 5e772b02a


ignite-2797 Incorrect timeout calculation for optimistic transactions fixed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5be5fa08
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5be5fa08
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5be5fa08

Branch: refs/heads/ignite-2801
Commit: 5be5fa080c7160184c9fbfa9b8b9b748f80c3078
Parents: fa356e3
Author: agura <ag...@gridgain.com>
Authored: Thu Mar 17 15:18:44 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon Mar 21 17:36:42 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/transactions/IgniteTxManager.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5be5fa08/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 2c1205a..e96a472 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1394,12 +1394,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         throws IgniteCheckedException {
         assert tx.optimistic() || !tx.local();
 
-        long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout());
+        long remainingTime = tx.timeout() - (U.currentTimeMillis() - tx.startTime());
 
         // For serializable transactions, failure to acquire lock means
         // that there is a serializable conflict. For all other isolation levels,
         // we wait for the lock.
-        long timeout = tx.timeout() == 0 ? 0 : remainingTime;
+        long timeout = tx.timeout() == 0 ? 0 : (remainingTime < 0 ? 0 : remainingTime);
 
         GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null;
 


[5/7] ignite git commit: Added ability to dump comm SPI recovery descriptors

Posted by av...@apache.org.
Added ability to dump comm SPI recovery descriptors


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f76a3133
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f76a3133
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f76a3133

Branch: refs/heads/ignite-2801
Commit: f76a3133f40e67621bd4dec54e173f04cbe21485
Parents: d182da2
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 15:58:49 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 15:58:49 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java     | 13 ++++++++-----
 .../spi/communication/tcp/TcpCommunicationSpi.java  | 16 ++++++++++++++++
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f76a3133/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9886bc6..b07ae19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1423,16 +1423,19 @@ public class GridNioServer<T> {
                                 for (SelectionKey key : keys) {
                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-                                    sb.append("    Conn [")
+                                    sb.append("    Connection info [")
                                         .append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress())
                                         .append(", bytesRcvd=").append(ses.bytesReceived())
                                         .append(", bytesSent=").append(ses.bytesSent());
 
-                                    if (ses.recoveryDescriptor() != null) {
-                                        sb.append(", msgsSent=").append(ses.recoveryDescriptor().sent())
-                                            .append(", msgsAckedByRmt=").append(ses.recoveryDescriptor().acked())
-                                            .append(", msgsRcvd=").append(ses.recoveryDescriptor().received());
+                                    GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+
+                                    if (desc != null) {
+                                        sb.append(", msgsSent=").append(desc.sent())
+                                            .append(", msgsAckedByRmt=").append(desc.acked())
+                                            .append(", msgsRcvd=").append(desc.received())
+                                            .append(", descIdHash=").append(System.identityHashCode(desc));
                                     }
                                     else
                                         sb.append(", recoveryDesc=null");

http://git-wip-us.apache.org/repos/asf/ignite/blob/f76a3133/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2f6b8ba..281b38e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1381,6 +1381,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void dumpStats() {
+        StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+
+        for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
+
+            sb.append("    [key=").append(entry.getKey())
+                .append(", msgsSent=").append(desc.sent())
+                .append(", msgsAckedByRmt=").append(desc.acked())
+                .append(", msgsRcvd=").append(desc.received())
+                .append(", descIdHash=").append(System.identityHashCode(desc))
+                .append(']').append(U.nl());
+        }
+
+        if (log.isInfoEnabled())
+            log.info(sb.toString());
+
         GridNioServer<Message> nioSrvr1 = nioSrvr;
 
         if (nioSrvr1 != null)


[2/7] ignite git commit: Fixed CacheContinuousQueryLostPartitionTest.testAtomicClientEvent test.

Posted by av...@apache.org.
Fixed CacheContinuousQueryLostPartitionTest.testAtomicClientEvent test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aafd6f77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aafd6f77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aafd6f77

Branch: refs/heads/ignite-2801
Commit: aafd6f77feb03b1f56efddfbef8f116b65a5ce4e
Parents: 5be5fa0
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Mar 21 18:52:08 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Mar 21 18:53:13 2016 +0300

----------------------------------------------------------------------
 .../query/continuous/CacheContinuousQueryLostPartitionTest.java    | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aafd6f77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index f4659dc..025dd80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -140,8 +140,6 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
         // node2 now becomes the primary for the key.
         stopGrid(0);
 
-        awaitPartitionMapExchange();
-
         cache2.put(key, "2");
 
         // Sanity check.


[4/7] ignite git commit: Fixed TCK tests.

Posted by av...@apache.org.
Fixed TCK tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d182da2a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d182da2a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d182da2a

Branch: refs/heads/ignite-2801
Commit: d182da2a6d2821182ae6e636711cd15a44e4ca21
Parents: fca47ec
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Mar 22 14:03:42 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Mar 22 14:03:42 2016 +0300

----------------------------------------------------------------------
 .../query/continuous/CacheContinuousQueryManager.java  | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d182da2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 869a51b..c01f636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -489,7 +489,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             false,
             false,
             loc,
-            keepBinary);
+            keepBinary,
+            false);
     }
 
     /**
@@ -528,6 +529,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             true,
             notifyExisting,
             loc,
+            false,
             false);
     }
 
@@ -608,6 +610,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
      * @param loc Local flag.
+     * @param onStart Waiting topology exchange.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
@@ -619,7 +622,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean internal,
         boolean notifyExisting,
         boolean loc,
-        final boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary,
+        boolean onStart) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -650,7 +654,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             pred).get();
 
         try {
-            if (hnd.isQuery() && cctx.userCache())
+            if (hnd.isQuery() && cctx.userCache() && !onStart)
                 hnd.waitTopologyFuture(cctx.kernalContext());
         }
         catch (IgniteCheckedException e) {
@@ -905,7 +909,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 false,
                 false,
                 false,
-                keepBinary
+                keepBinary,
+                onStart
             );
         }
 


[6/7] ignite git commit: Added ability to dump direct message reader and writer

Posted by av...@apache.org.
Added ability to dump direct message reader and writer


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/660aa2f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/660aa2f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/660aa2f7

Branch: refs/heads/ignite-2801
Commit: 660aa2f7b3948d34f20baac516d29e43984f4ede
Parents: f76a313
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Mar 22 17:03:03 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Mar 22 17:03:03 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/direct/DirectMessageReader.java      | 11 +++++++++++
 .../ignite/internal/direct/DirectMessageWriter.java      | 11 +++++++++++
 .../ignite/internal/direct/state/DirectMessageState.java |  9 +++++++++
 .../direct/stream/v1/DirectByteBufferStreamImplV1.java   |  6 ++++++
 .../direct/stream/v2/DirectByteBufferStreamImplV2.java   |  8 ++++++++
 .../ignite/internal/util/nio/GridDirectParser.java       |  4 ++--
 .../apache/ignite/internal/util/nio/GridNioServer.java   |  6 ++++++
 7 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index e0b7b22..323d142 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
 import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -373,6 +374,11 @@ public class DirectMessageReader implements MessageReader {
         state.reset();
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DirectMessageReader.class, this);
+    }
+
     /**
      */
     private static class StateItem implements DirectMessageStateItem {
@@ -407,5 +413,10 @@ public class DirectMessageReader implements MessageReader {
         @Override public void reset() {
             state = 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StateItem.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index c960ad4..c381711 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
 import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -332,6 +333,11 @@ public class DirectMessageWriter implements MessageWriter {
         state.reset();
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DirectMessageWriter.class, this);
+    }
+
     /**
      */
     private static class StateItem implements DirectMessageStateItem {
@@ -369,5 +375,10 @@ public class DirectMessageWriter implements MessageWriter {
             state = 0;
             hdrWritten = false;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StateItem.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
index a61bb30..58f625f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.direct.state;
 
 import java.lang.reflect.Array;
+import java.util.Arrays;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
 
 /**
@@ -28,6 +31,7 @@ public class DirectMessageState<T extends DirectMessageStateItem> {
     private static final int INIT_SIZE = 10;
 
     /** Item factory. */
+    @GridToStringExclude
     private final IgniteOutClosure<T> factory;
 
     /** Stack array. */
@@ -95,4 +99,9 @@ public class DirectMessageState<T extends DirectMessageStateItem> {
 
         stack[0].reset();
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DirectMessageState.class, this, "stack", Arrays.toString(stack));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
index 2ca3a23..9b21058 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
 import java.util.UUID;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -1349,6 +1350,11 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
         };
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DirectByteBufferStreamImplV1.class, this);
+    }
+
     /**
      * Array creator.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
index 489e7e1..97da92d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -29,6 +29,8 @@ import java.util.RandomAccess;
 import java.util.UUID;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -218,6 +220,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     private static final Object NULL = new Object();
 
     /** */
+    @GridToStringExclude
     private final MessageFactory msgFactory;
 
     /** */
@@ -1572,6 +1575,11 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DirectByteBufferStreamImplV2.class, this);
+    }
+
     /**
      * Array creator.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index aa88808..aaa7f3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -31,10 +31,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridDirectParser implements GridNioParser {
     /** Message metadata key. */
-    private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+    static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Reader metadata key. */
-    private static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+    static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
     private final MessageFactory msgFactory;

http://git-wip-us.apache.org/repos/asf/ignite/blob/660aa2f7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b07ae19..3ae8312 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -62,6 +62,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -1423,9 +1424,14 @@ public class GridNioServer<T> {
                                 for (SelectionKey key : keys) {
                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
+                                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
                                     sb.append("    Connection info [")
                                         .append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress())
+                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null")
                                         .append(", bytesRcvd=").append(ses.bytesReceived())
                                         .append(", bytesSent=").append(ses.bytesSent());
 


[7/7] ignite git commit: Merge remote-tracking branch 'remotes/upstream/gridgain-7.5.9' into ignite-2801

Posted by av...@apache.org.
Merge remote-tracking branch 'remotes/upstream/gridgain-7.5.9' into ignite-2801


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e772b02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e772b02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e772b02

Branch: refs/heads/ignite-2801
Commit: 5e772b02a5586f20b08e06d9246af768981e2c48
Parents: 863a6c5 660aa2f
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Mar 22 20:35:38 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Mar 22 20:35:38 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../internal/direct/DirectMessageReader.java    |  11 +
 .../internal/direct/DirectMessageWriter.java    |  11 +
 .../direct/state/DirectMessageState.java        |   9 +
 .../stream/v1/DirectByteBufferStreamImplV1.java |   6 +
 .../stream/v2/DirectByteBufferStreamImplV2.java |   8 +
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  23 +-
 .../cache/transactions/IgniteTxManager.java     |   4 +-
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../internal/util/nio/GridDirectParser.java     |   4 +-
 .../ignite/internal/util/nio/GridNioServer.java |  19 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  16 +
 .../CacheContinuousQueryLostPartitionTest.java  |   2 -
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 20 files changed, 687 insertions(+), 63 deletions(-)
----------------------------------------------------------------------



[3/7] ignite git commit: Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."

Posted by av...@apache.org.
Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fca47ec7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fca47ec7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fca47ec7

Branch: refs/heads/ignite-2801
Commit: fca47ec77e4ec6753765e531cd557a279d10d125
Parents: aafd6f7
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Mon Mar 21 23:44:56 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Mar 21 23:49:17 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  12 +
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 10 files changed, 600 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..19bf1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..0ac6877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..6243af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
@@ -146,10 +148,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private Map<Integer, Long> initUpdCntrs;
+    private transient volatile Map<Integer, Long> initUpdCntrs;
 
     /** */
-    private AffinityTopologyVersion initTopVer;
+    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
+
+    /** */
+    private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
     private transient boolean ignoreClsNotFound;
@@ -264,9 +269,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
-        this.initTopVer = topVer;
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
+        this.initUpdCntrsPerNode = cntrsPerNode;
         this.initUpdCntrs = cntrs;
+        this.initTopVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -296,20 +303,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         assert !skipPrimaryCheck || loc;
 
-        final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-        if (!internal && cctx != null && initUpdCntrs != null) {
-            Map<Integer, Long> map = cctx.topology().updateCounters();
-
-            for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                Long cntr0 = initUpdCntrs.get(e.getKey());
-                Long cntr1 = e.getValue();
-
-                if (cntr0 == null || cntr1 > cntr0)
-                    initUpdCntrs.put(e.getKey(), cntr1);
-            }
-        }
-
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -561,6 +554,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             entry.prepareMarshal(cctx);
     }
 
+    /**
+     * Wait topology.
+     */
+    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        if (!cctx.isLocal()) {
+            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+            for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
+                getOrCreatePartitionRecovery(ctx, partId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -668,19 +675,54 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (e.updateCounter() == -1L)
             return F.asList(e);
 
-        PartitionRecovery rec = rcvs.get(e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+
+        return rec.collectEntries(e);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param partId Partition id.
+     * @return Partition recovery.
+     */
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+        PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
-            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
-                initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+            Long partCntr = null;
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+            AffinityTopologyVersion initTopVer0 = initTopVer;
+
+            if (initTopVer0 != null) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                GridCacheAffinityManager aff = cctx.affinity();
+
+                if (initUpdCntrsPerNode != null) {
+                    for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
+
+                        if (map != null) {
+                            partCntr = map.get(partId);
+
+                            break;
+                        }
+                    }
+                }
+                else if (initUpdCntrs != null) {
+                    partCntr = initUpdCntrs.get(partId);
+                }
+            }
+
+            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
         }
 
-        return rec.collectEntries(e);
+        return rec;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 353043f..869a51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -649,6 +649,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             autoUnsubscribe,
             pred).get();
 
+        try {
+            if (hnd.isQuery() && cctx.userCache())
+                hnd.waitTopologyFuture(cctx.kernalContext());
+        }
+        catch (IgniteCheckedException e) {
+            log.warning("Failed to start continuous query.", e);
+
+            cctx.kernalContext().continuous().stopRoutine(id);
+
+            throw new IgniteCheckedException("Failed to start continuous query.", e);
+        }
+
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..46e87af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public String cacheName();
 
     /**
+     * @param cntrsPerNode Init state partition counters for node.
      * @param cntrs Init state for partition counters.
      * @param topVer Topology version.
      */
-    public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1776748..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -220,25 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             // Update partition counters.
                             if (routine != null && routine.handler().isQuery()) {
+                                Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode();
                                 Map<Integer, Long> cntrs = msg.updateCounters();
 
                                 GridCacheAdapter<Object, Object> interCache =
                                     ctx.cache().internalCache(routine.handler().cacheName());
 
-                                if (interCache != null && cntrs != null && interCache.context() != null
-                                    && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
-                                    Map<Integer, Long> map = interCache.context().topology().updateCounters();
+                                GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
-                                    for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                                        Long cntr0 = cntrs.get(e.getKey());
-                                        Long cntr1 = e.getValue();
+                                if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
 
-                                        if (cntr0 == null || cntr1 > cntr0)
-                                            cntrs.put(e.getKey(), cntr1);
-                                    }
-                                }
-
-                                routine.handler().updateCounters(topVer, msg.updateCounters());
+                                routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
 
                             fut.onRemoteRegistered();
@@ -756,7 +748,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -923,11 +915,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (proc != null) {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
-                if (cache != null && !cache.isLocal()) {
-                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
-
-                    req.addUpdateCounters(cntrs);
-                }
+                if (cache != null && !cache.isLocal())
+                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 9644372..ca34b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     private final Map<UUID, IgniteCheckedException> errs;
 
     /** */
+    @GridToStringExclude
     private final Map<Integer, Long> updateCntrs;
 
+    /** */
+    @GridToStringExclude
+    private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
+     * @param cntrs Partition counters.
+     * @param cntrsPerNode Partition counters per node.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
-        Map<Integer, Long> cntrs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId,
+        Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> cntrs,
+        Map<UUID, Map<Integer, Long>> cntrsPerNode) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
         this.updateCntrs = cntrs;
+        this.updateCntrsPerNode = cntrsPerNode;
     }
 
     /** {@inheritDoc} */
@@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @return Update counters for partitions per each node.
+     */
+    public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
+        return updateCntrsPerNode;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ff037d4..24eb050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private Map<Integer, Long> updateCntrs;
 
+    /** */
+    private Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /**
      * @param cntrs Update counters.
      */
-    public void addUpdateCounters(Map<Integer, Long> cntrs) {
+    private void addUpdateCounters(Map<Integer, Long> cntrs) {
         if (updateCntrs == null)
             updateCntrs = new HashMap<>();
 
@@ -86,6 +89,21 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @param nodeId Local node ID.
+     * @param cntrs Update counters.
+     */
+    public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
+        addUpdateCounters(cntrs);
+
+        if (updateCntrsPerNode == null)
+            updateCntrsPerNode = new HashMap<>();
+
+        Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
+
+        assert old == null : old;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -106,7 +124,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
new file mode 100644
index 0000000..29b351b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        if (gridName.endsWith(String.valueOf(NODES)))
+            cfg.setClientMode(ThreadLocalRandom.current().nextBoolean());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartReplicated() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartition() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartitionTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                assert U.await(latch, 1, MINUTES);
+
+                cache.put(i, "v");
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(2, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        final AtomicBoolean stopRes = new AtomicBoolean(false);
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stopRes.get()) {
+                        startGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES + 1;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+
+                        stopGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+                    }
+
+                    return null;
+                }
+            });
+
+            U.sleep(100);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                latch.await();
+
+                cache.put(i, "v");
+
+                assertEquals("v", cache.get(i));
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(5, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+
+            if (restartFut != null) {
+                stopRes.set(true);
+
+                restartFut.get();
+
+                stopGrid(NODES);
+            }
+        }
+    }
+
+    /**
+     * @param key Key
+     * @param cache Cache.
+     * @param id ID.
+     * @return Future.
+     */
+    public IgniteFuture<String> waitForKey(Integer key, final IgniteCache<Integer, String> cache, final int id) {
+        String v = cache.get(key);
+
+        // From now on, all futures will be completed immediately (since the key has been
+        // inserted).
+        if (v != null)
+            return new IgniteFinishedFutureImpl<>("immediately");
+
+        final IgniteFuture<String> promise = new IgniteFutureImpl<>(new GridFutureAdapter<String>());
+
+        final CacheEntryListenerConfiguration<Integer, String> cfg =
+            createCacheListener(key, promise, id);
+
+        promise.listen(new IgniteInClosure<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> future) {
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.deregisterCacheEntryListener(cfg);
+
+                        return null;
+                    }
+                });
+            }
+        });
+
+        // Start listening.
+        // Assumption: When the call returns, the listener is guaranteed to have been registered.
+        cache.registerCacheEntryListener(cfg);
+
+        // Now must check the cache again, to make sure that we didn't miss the key insert while we
+        // were busy setting up the cache listener.
+        // Check asynchronously.
+        IgniteCache<Integer, String> asyncCache = cache.withAsync();
+        asyncCache.get(key);
+
+        // Complete the promise if the key was inserted concurrently.
+        asyncCache.<String>future().listen(new IgniteInClosure<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> f) {
+                String value = f.get();
+
+                if (value != null) {
+                    log.info("Completed by get: " + id);
+
+                    (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+                }
+            }
+        });
+
+        return promise;
+    }
+
+    /**
+     * @param key Key.
+     * @param result Result.
+     * @param id Listener ID.
+     * @return Listener
+     */
+    private CacheEntryListenerConfiguration<Integer, String> createCacheListener(
+        Integer key,
+        IgniteFuture<String> result,
+        int id) {
+        return new MutableCacheEntryListenerConfiguration<>(
+            factoryOf(new CacheListener(result, id)),
+            new SingletonFactory<>(new KeyEventFilter(key, id)), false, true);
+    }
+
+
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicMode Atomicy mode.
+     * @param backups Backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, String> cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicMode, int backups) {
+        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>("test-" + cacheMode + atomicMode + backups);
+
+        cfg.setCacheMode(cacheMode);
+        cfg.setAtomicityMode(atomicMode);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setBackups(backups);
+        cfg.setReadFromBackup(false);
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class CacheListener implements CacheEntryCreatedListener<Integer, String>, Serializable {
+        /** */
+        final IgniteFuture<String> result;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param result Result.
+         * @param id ID.
+         */
+        CacheListener(IgniteFuture<String> result, int id) {
+            this.result = result;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+            (((GridFutureAdapter)((IgniteFutureImpl)result).internalFuture())).onDone("by listener");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class KeyEventFilter implements CacheEntryEventFilter<Integer, String>, Serializable {
+        /** */
+        private static final long serialVersionUID = 42L;
+
+        /** */
+        private final Object key;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param key Key.
+         * @param id ID.
+         */
+        KeyEventFilter(Object key, int id) {
+            this.key = key;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+            return e.getKey().equals(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass())
+                && key.equals(((KeyEventFilter) o).key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fca47ec7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 5a54415..467639f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
@@ -218,6 +219,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);