You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 11:24:15 UTC

[01/10] ignite git commit: IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.

Repository: ignite
Updated Branches:
  refs/heads/ignite-5932 178006226 -> b73792aec


IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ffa1099
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ffa1099
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ffa1099

Branch: refs/heads/ignite-5932
Commit: 8ffa1099e2afd14052f7c91be822b2aa3f5f2a8d
Parents: 0f3546a
Author: AMRepo <an...@gmail.com>
Authored: Tue Oct 10 11:57:20 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Oct 11 12:00:26 2017 +0300

----------------------------------------------------------------------
 .../processors/service/GridServiceProcessor.java         | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8ffa1099/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 9272760..6f1dfc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -316,7 +316,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             Service svc = ctx.service();
 
             if (svc != null)
-                svc.cancel(ctx);
+                try {
+                    svc.cancel(ctx);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to cancel service (ignoring) [name=" + ctx.name() +
+                        ", execId=" + ctx.executionId() + ']', e);
+
+                    if (e instanceof Error)
+                        throw e;
+                }
 
             ctx.executor().shutdownNow();
         }


[06/10] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3478

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23742962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23742962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23742962

Branch: refs/heads/ignite-5932
Commit: 23742962f8d539aac33a7ac953f09a1407b330e9
Parents: 970cf47 b8b7c50
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:36:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:36:52 2017 +0300

----------------------------------------------------------------------
 .../internal/MarshallerMappingFileStore.java    |   15 +-
 .../service/GridServiceProcessor.java           |   11 +-
 .../nio/GridAbstractCommunicationClient.java    |    2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  378 +++---
 .../IgniteMarshallerCacheFSRestoreTest.java     |   71 +-
 .../Apache.Ignite.Core.csproj                   |    1 -
 .../Impl/Binary/Io/BinaryHeapStream.cs          | 1018 +++++++++++++-
 .../Impl/Binary/Io/BinaryStreamBase.cs          | 1249 ------------------
 8 files changed, 1237 insertions(+), 1508 deletions(-)
----------------------------------------------------------------------



[03/10] ignite git commit: IGNITE-6536 Node fails when detects mapping storage corruption

Posted by sb...@apache.org.
IGNITE-6536 Node fails when detects mapping storage corruption

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5490c7d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5490c7d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5490c7d9

Branch: refs/heads/ignite-5932
Commit: 5490c7d927daca2d64f108b6c2eafe03bbd6f54e
Parents: b0158fb
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Oct 11 15:33:23 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Oct 11 15:34:06 2017 +0300

----------------------------------------------------------------------
 .../internal/MarshallerMappingFileStore.java    | 15 +++--
 .../IgniteMarshallerCacheFSRestoreTest.java     | 71 +++++++++++++++++++-
 2 files changed, 77 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index eabbdb8..59a99b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -167,16 +167,19 @@ final class MarshallerMappingFileStore {
 
             try (FileInputStream in = new FileInputStream(file)) {
                 try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
-                    String className = reader.readLine();
+                    String clsName = reader.readLine();
 
-                    marshCtx.registerClassNameLocally(platformId, typeId, className);
+                    if (clsName == null) {
+                        throw new IgniteCheckedException("Class name is null for [platformId=" + platformId +
+                            ", typeId=" + typeId + "], marshaller mappings storage is broken. " +
+                            "Clean up marshaller directory (<work_dir>/marshaller) and restart the node.");
+                    }
+
+                    marshCtx.registerClassNameLocally(platformId, typeId, clsName);
                 }
             }
             catch (IOException e) {
-                throw new IgniteCheckedException("Reading marshaller mapping from file "
-                    + name
-                    + " failed."
-                    , e);
+                throw new IgniteCheckedException("Reading marshaller mapping from file " + name + " failed.", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 21a3e43..ac15971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -23,13 +23,16 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
@@ -47,6 +50,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
     /** */
     private volatile boolean isDuplicateObserved = true;
 
+    /** */
+    private boolean isPersistenceEnabled;
+
     /**
      *
      */
@@ -67,6 +73,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
         }
     }
 
+    /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
@@ -75,13 +82,17 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(discoSpi);
 
-        CacheConfiguration singleCacheConfig = new CacheConfiguration()
+        CacheConfiguration singleCacheCfg = new CacheConfiguration()
             .setName(DEFAULT_CACHE_NAME)
             .setCacheMode(CacheMode.PARTITIONED)
             .setBackups(1)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC);
 
-        cfg.setCacheConfiguration(singleCacheConfig);
+        cfg.setCacheConfiguration(singleCacheCfg);
+
+        //persistence must be enabled to verify restoring mappings from FS case
+        if (isPersistenceEnabled)
+            cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
 
         return cfg;
     }
@@ -110,11 +121,14 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
      * In that case the request must not be marked as duplicate and must be processed in a regular way.
      * No hangs must take place.
      *
-     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> Take a look at JIRA ticket for more information about context of this test.
+     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> JIRA ticket
+     * provides more information about context of this test.
      *
      * This test must never hang on proposing of MarshallerMapping.
      */
     public void testFileMappingReadAndPropose() throws Exception {
+        isPersistenceEnabled = false;
+
         prepareMarshallerFileStore();
 
         IgniteEx ignite0 = startGrid(0);
@@ -162,6 +176,57 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * Verifies scenario that node with corrupted marshaller mapping store must fail on startup
+     * with appropriate error message.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-6536">IGNITE-6536</a> JIRA provides more information
+     * about this case.
+     */
+    public void testNodeStartFailsOnCorruptedStorage() throws Exception {
+        isPersistenceEnabled = true;
+
+        Ignite ig = startGrids(3);
+
+        ig.active(true);
+
+        ig.cache(DEFAULT_CACHE_NAME).put(0, new SimpleValue(0, "value0"));
+
+        stopAllGrids();
+
+        corruptMarshallerStorage();
+
+        try {
+            startGrid(0);
+        }
+        catch (IgniteCheckedException e) {
+            verifyException((IgniteCheckedException) e.getCause());
+        }
+    }
+
+    /**
+     * Class name for CustomClass class mapping file gets cleaned up from file system.
+     */
+    private void corruptMarshallerStorage() throws Exception {
+        String marshallerDir = U.defaultWorkDirectory() + File.separator + "marshaller";
+
+        File[] storedMappingsFiles = new File(marshallerDir).listFiles();
+
+        assert storedMappingsFiles.length == 1;
+
+        try (FileOutputStream out = new FileOutputStream(storedMappingsFiles[0])) {
+            out.getChannel().truncate(0);
+        }
+    }
+
+    /** */
+    private void verifyException(IgniteCheckedException e) throws Exception {
+        String msg = e.getMessage();
+
+        if (msg == null || !msg.contains("Class name is null"))
+            throw new Exception("Exception with unexpected message was thrown: " + msg, e);
+    }
+
     /** */
     private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
 


[02/10] ignite git commit: IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Posted by sb...@apache.org.
IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2

Branch: refs/heads/ignite-5932
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300

----------------------------------------------------------------------
 .../nio/GridAbstractCommunicationClient.java    |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 378 ++++++++++---------
 2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
 
     /** {@inheritDoc} */
     @Override public void forceClose() {
-        closed.set(false);
+        closed.set(true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             }
 
             try {
-                safeHandshake(client,
-                    null,
-                    node.id(),
-                    timeoutHelper.nextTimeoutChunk(connTimeout0),
-                    null,
-                    null);
+                safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
 
-        boolean conn = false;
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;
 
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
             int lastWaitingTimeout = 1;
 
-            while (!conn) { // Reconnection on handshake timeout.
+            while (client == null) { // Reconnection on handshake timeout.
+                boolean needWait = false;
+
                 try {
                     SocketChannel ch = SocketChannel.open();
 
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         return null;
                     }
 
-                    Long rcvCnt = null;
+                    Long rcvCnt;
 
                     Map<Integer, Object> meta = new HashMap<>();
 
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                         Integer handshakeConnIdx = connIdx;
 
-                        rcvCnt = safeHandshake(ch,
+                        rcvCnt = safeTcpHandshake(ch,
                             recoveryDesc,
                             node.id(),
                             timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             handshakeConnIdx);
 
                         if (rcvCnt == ALREADY_CONNECTED) {
-                            recoveryDesc.release();
-
                             return null;
                         }
                         else if (rcvCnt == NODE_STOPPING) {
-                            recoveryDesc.release();
-
                             throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                         }
                         else if (rcvCnt == NEED_WAIT) {
-                            recoveryDesc.release();
-
-                            U.closeQuiet(ch);
-
-                            if (lastWaitingTimeout < 60000)
-                                lastWaitingTimeout *= 2;
-
-                            U.sleep(lastWaitingTimeout);
+                            needWait = true;
 
                             continue;
                         }
-                    }
-                    finally {
-                        if (recoveryDesc != null && rcvCnt == null)
-                            recoveryDesc.release();
-                    }
 
-                    try {
                         meta.put(CONN_IDX_META, connKey);
 
                         if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
 
                         client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
-                        conn = true;
                     }
                     finally {
-                        if (!conn) {
+                        if (client == null) {
+                            U.closeQuiet(ch);
+
                             if (recoveryDesc != null)
                                 recoveryDesc.release();
+
+                            if (needWait) {
+                                if (lastWaitingTimeout < 60000)
+                                    lastWaitingTimeout *= 2;
+
+                                U.sleep(lastWaitingTimeout);
+                            }
                         }
                     }
                 }
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
-            if (conn)
+            if (client != null)
                 break;
         }
 
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * Performs handshake in timeout-safe way.
      *
      * @param client Client.
+     * @param rmtNodeId Remote node.
+     * @param timeout Timeout for handshake.
+     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private void safeShmemHandshake(
+        GridCommunicationClient client,
+        UUID rmtNodeId,
+        long timeout
+    ) throws IgniteCheckedException {
+        HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+            U.currentTimeMillis() + timeout);
+
+        addTimeoutObject(obj);
+
+        try {
+            client.doHandshake(new HandshakeClosure(rmtNodeId));
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                removeTimeoutObject(obj);
+
+            // Ignoring whatever happened after timeout - reporting only timeout event.
+            if (!cancelled)
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+                        "(consider increasing 'connectionTimeout' configuration property)."));
+        }
+    }
+
+    /**
+     * Performs handshake in timeout-safe way.
+     *
+     * @param ch Socket channel.
      * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @return Handshake response.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> long safeHandshake(
-        T client,
+    private long safeTcpHandshake(
+        SocketChannel ch,
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
         GridSslMeta sslMeta,
         @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
-        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+        HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
         try {
-            if (client instanceof GridCommunicationClient)
-                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
-            else {
-                SocketChannel ch = (SocketChannel)client;
+            BlockingSslHandler sslHnd = null;
 
-                boolean success = false;
+            ByteBuffer buf;
 
-                try {
-                    BlockingSslHandler sslHnd = null;
+            if (isSslEnabled()) {
+                assert sslMeta != null;
 
-                    ByteBuffer buf;
+                sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
 
-                    if (isSslEnabled()) {
-                        assert sslMeta != null;
+                if (!sslHnd.handshake())
+                    throw new HandshakeException("SSL handshake is not completed.");
 
-                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+                ByteBuffer handBuff = sslHnd.applicationBuffer();
 
-                        if (!sslHnd.handshake())
-                            throw new HandshakeException("SSL handshake is not completed.");
+                if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+                    buf = ByteBuffer.allocate(1000);
 
-                        ByteBuffer handBuff = sslHnd.applicationBuffer();
+                    int read = ch.read(buf);
 
-                        if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
-                            buf = ByteBuffer.allocate(1000);
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            int read = ch.read(buf);
+                    buf.flip();
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
-                            buf.flip();
-
-                            buf = sslHnd.decode(buf);
-                        }
-                        else
-                            buf = handBuff;
-                    }
-                    else {
-                        buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+                    buf = sslHnd.decode(buf);
+                }
+                else
+                    buf = handBuff;
+            }
+            else {
+                buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
 
-                        for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
-                            int read = ch.read(buf);
+                for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+                    int read = ch.read(buf);
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            i += read;
-                        }
-                    }
+                    i += read;
+                }
+            }
 
-                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+            UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
-                    if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
-                            ", rcvd=" + rmtNodeId0 + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Received remote node ID: " + rmtNodeId0);
+            if (!rmtNodeId.equals(rmtNodeId0))
+                throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                    ", rcvd=" + rmtNodeId0 + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    if (isSslEnabled()) {
-                        assert sslHnd != null;
+            if (isSslEnabled()) {
+                assert sslHnd != null;
 
-                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
-                    }
-                    else
-                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+            }
+            else
+                ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
-                    ClusterNode locNode = getLocalNode();
+            ClusterNode locNode = getLocalNode();
 
-                    if (locNode == null)
-                        throw new IgniteCheckedException("Local node has not been started or " +
-                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+            if (locNode == null)
+                throw new IgniteCheckedException("Local node has not been started or " +
+                    "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
-                    if (recovery != null) {
-                        HandshakeMessage msg;
+            if (recovery != null) {
+                HandshakeMessage msg;
 
-                        int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+                int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
 
-                        if (handshakeConnIdx != null) {
-                            msg = new HandshakeMessage2(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received(),
-                                handshakeConnIdx);
+                if (handshakeConnIdx != null) {
+                    msg = new HandshakeMessage2(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received(),
+                        handshakeConnIdx);
 
-                            msgSize += 4;
-                        }
-                        else {
-                            msg = new HandshakeMessage(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received());
-                        }
+                    msgSize += 4;
+                }
+                else {
+                    msg = new HandshakeMessage(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received());
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Writing handshake message [locNodeId=" + locNode.id() +
-                                ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+                        ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(msgSize);
+                buf = ByteBuffer.allocate(msgSize);
 
-                        buf.order(ByteOrder.nativeOrder());
+                buf.order(ByteOrder.nativeOrder());
 
-                        boolean written = msg.writeTo(buf, null);
+                boolean written = msg.writeTo(buf, null);
 
-                        assert written;
+                assert written;
 
-                        buf.flip();
+                buf.flip();
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(buf));
-                        }
-                        else
-                            ch.write(buf);
-                    }
-                    else {
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                    ch.write(sslHnd.encrypt(buf));
+                }
+                else
+                    ch.write(buf);
+            }
+            else {
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
-                        }
-                        else
-                            ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
-                    }
+                    ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+                }
+                else
+                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+            }
 
-                    if (recovery != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            if (recovery != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            buf = ByteBuffer.allocate(1000);
-                            buf.order(ByteOrder.nativeOrder());
+                    buf = ByteBuffer.allocate(1000);
+                    buf.order(ByteOrder.nativeOrder());
 
-                            ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
-                            decode.order(ByteOrder.nativeOrder());
+                    ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+                    decode.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                buf.flip();
+                        buf.flip();
 
-                                ByteBuffer decode0 = sslHnd.decode(buf);
+                        ByteBuffer decode0 = sslHnd.decode(buf);
 
-                                i += decode0.remaining();
+                        i += decode0.remaining();
 
-                                decode = appendAndResizeIfNeeded(decode, decode0);
+                        decode = appendAndResizeIfNeeded(decode, decode0);
 
-                                buf.clear();
-                            }
+                        buf.clear();
+                    }
 
-                            decode.flip();
+                    decode.flip();
 
-                            rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+                    rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
 
-                            if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
-                                decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+                        decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                                sslMeta.decodedBuffer(decode);
-                            }
+                        sslMeta.decodedBuffer(decode);
+                    }
 
-                            ByteBuffer inBuf = sslHnd.inputBuffer();
+                    ByteBuffer inBuf = sslHnd.inputBuffer();
 
-                            if (inBuf.position() > 0)
-                                sslMeta.encodedBuffer(inBuf);
-                        }
-                        else {
-                            buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (inBuf.position() > 0)
+                        sslMeta.encodedBuffer(inBuf);
+                }
+                else {
+                    buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                            buf.order(ByteOrder.nativeOrder());
+                    buf.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                i += read;
-                            }
+                        i += read;
+                    }
 
-                            rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
-                        }
+                    rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
-                        if (rcvCnt == -1) {
-                            if (log.isDebugEnabled())
-                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                        }
-                        else
-                            success = true;
-                    }
-                    else
-                        success = true;
-                }
-                catch (IOException e) {
+                if (rcvCnt == -1) {
                     if (log.isDebugEnabled())
-                        log.debug("Failed to read from channel: " + e);
-
-                    throw new IgniteCheckedException("Failed to read from channel.", e);
-                }
-                finally {
-                    if (!success)
-                        U.closeQuiet(ch);
+                        log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
                 }
             }
         }
+        catch (IOException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to read from channel: " + e);
+
+            throw new IgniteCheckedException("Failed to read from channel.", e);
+        }
         finally {
             boolean cancelled = obj.cancel();
 


[10/10] ignite git commit: ignite-5932

Posted by sb...@apache.org.
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b73792ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b73792ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b73792ae

Branch: refs/heads/ignite-5932
Commit: b73792aece2cd05a294d46d6befe0496f7ab1772
Parents: 031928e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:52:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 14:24:00 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxFinishFuture.java  |  4 +-
 .../near/GridNearTxFinishAndAckFuture.java      |  4 +-
 .../near/GridNearTxFinishFuture.java            | 25 +++++++---
 .../cache/distributed/near/GridNearTxLocal.java |  2 -
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 50 ++++++++++++--------
 .../processors/cache/mvcc/MvccQueryTracker.java | 24 ++++++----
 .../processors/cache/mvcc/TxMvccInfo.java       | 12 +++--
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 34 ++++++++++++-
 8 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d624e2c..cb2eaa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -300,7 +300,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
 
             assert mvccInfo != null;
 
-            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(), waitTxs);
 
             add(fut);
 
@@ -412,7 +412,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         if (tx.onePhaseCommit())
             return false;
 
-        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null;
+        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null || F.isEmpty(tx.writeEntries());
 
         boolean sync = tx.syncMode() == FULL_SYNC;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index 5d8b77c..36efe2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -61,9 +61,9 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                     TxMvccInfo mvccInfo = tx.mvccInfo();
 
                     if (qryTracker != null)
-                        ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+                        ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true);
                     else if (mvccInfo != null) {
-                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(),
                             mvccInfo.version(),
                             null);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9b60d7..1116c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -403,6 +404,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
+    /**
+     *
+     */
+    private void ackMvccCoordinatorOnRollback() {
+        TxMvccInfo mvccInfo = tx.mvccInfo();
+
+        MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
+        if (qryTracker != null)
+            qryTracker.onTxDone(mvccInfo, cctx, false);
+        else if (mvccInfo != null)
+            cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public void finish(boolean commit, boolean clearThreadMap) {
@@ -421,11 +436,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             return;
         }
 
-        if (!commit && tx.mvccInfo() != null) {
-            TxMvccInfo mvccInfo = tx.mvccInfo();
-
-            cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version());
-        }
+        if (!commit)
+            ackMvccCoordinatorOnRollback();
 
         try {
             if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
@@ -436,7 +448,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
 
                     assert mvccInfo != null;
 
-                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
+                        waitTxs);
 
                     add(fut);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c774f93..51d842c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3373,8 +3373,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false)))
             return chainFinishFuture(finishFut, false);
 
-        cctx.mvcc().addFuture(fut0, fut0.futureId());
-
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
         if (prepFut == null || prepFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index a9a5eba..a5a9b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -454,47 +454,57 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
         ackFuts.put(fut.id, fut);
 
-        MvccCoordinatorMessage msg;
+        CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer);
+
+        try {
+            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to ack, finish without error.
+                else
+                    fut.onDone(e);
+            }
+        }
+
+        return fut;
+    }
+
+    private CoordinatorAckRequestTx createTxAckMessage(long futId,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer)
+    {
+        CoordinatorAckRequestTx msg;
 
         if (readVer != null) {
             long trackCntr = queryTrackCounter(readVer);
 
             if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
-                msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+                msg = new CoordinatorAckRequestTxAndQuery(futId,
                     updateVer.counter(),
                     trackCntr);
             }
             else {
-                msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+                msg = new CoordinatorAckRequestTxAndQueryEx(futId,
                     updateVer.counter(),
                     readVer.coordinatorVersion(),
                     trackCntr);
             }
         }
         else
-            msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
-
-        try {
-            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null) {
-                if (e instanceof ClusterTopologyCheckedException)
-                    fut.onDone(); // No need to ack, finish without error.
-                else
-                    fut.onDone(e);
-            }
-        }
+            msg = new CoordinatorAckRequestTx(futId, updateVer.counter());
 
-        return fut;
+        return msg;
     }
 
     /**
      * @param crdId Coordinator node ID.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      */
-    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
+    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion readVer) {
+        CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer);
 
         msg.skipResponse(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index e45b77c..0e3eb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -116,7 +116,13 @@ public class MvccQueryTracker {
             cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
     }
 
-    public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+    /**
+     * @param mvccInfo Mvcc update info.
+     * @param ctx Context.
+     * @param commit If {@code true} ack commit, otherwise rollback.
+     * @return Commit ack future.
+     */
+    public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx, boolean commit) {
         MvccCoordinator mvccCrd0 = null;
         MvccCoordinatorVersion mvccVer0 = null;
 
@@ -131,24 +137,22 @@ public class MvccQueryTracker {
             }
         }
 
-        if (mvccVer0 != null) {
+        assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId());
+
+        if (mvccVer0 != null || mvccInfo != null) {
             if (mvccInfo == null) {
                 cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
 
                 return null;
             }
-            else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
-                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
             else {
-                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
-
-                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+                if (commit)
+                    return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+                else
+                    ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
             }
         }
 
-        if (mvccInfo != null)
-            return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
-
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
index 428d707..2306110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -42,8 +42,8 @@ public class TxMvccInfo implements Message {
     }
 
     /**
-     * @param crd
-     * @param mvccVer
+     * @param crd Coordinator node ID.
+     * @param mvccVer Mvcc version.
      */
     public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
@@ -53,10 +53,16 @@ public class TxMvccInfo implements Message {
         this.mvccVer = mvccVer;
     }
 
-    public UUID coordinator() {
+    /**
+     * @return Coordinator node ID.
+     */
+    public UUID coordinatorNodeId() {
         return crd;
     }
 
+    /**
+     * @return Mvcc version.
+     */
     public MvccCoordinatorVersion version() {
         return mvccVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 8964cd4..70b910b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -336,6 +336,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testActiveQueriesCleanup() throws Exception {
+        activeQueriesCleanup(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActiveQueriesCleanupTx() throws Exception {
+        activeQueriesCleanup(true);
+    }
+
+    /**
+     * @param tx If {@code true} tests reads inside transaction.
+     * @throws Exception If failed.
+     */
+    private void activeQueriesCleanup(final boolean tx) throws Exception {
         startGridsMultiThreaded(SRVS);
 
         client = true;
@@ -354,7 +369,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             @Override public void apply(Integer idx) {
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                IgniteCache cache = ignite(idx % NODES).cache(DEFAULT_CACHE_NAME);
+                Ignite node = ignite(idx % NODES);
+
+                IgniteTransactions txs = node.transactions();
+
+                IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
 
                 while (System.currentTimeMillis() < stopTime) {
                     int keyCnt = rnd.nextInt(10) + 1;
@@ -364,7 +383,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                     for (int i = 0; i < keyCnt; i++)
                         keys.add(rnd.nextInt());
 
-                    cache.getAll(keys);
+                    if (tx) {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            cache.getAll(keys);
+
+                            if (rnd.nextBoolean())
+                                tx.commit();
+                            else
+                                tx.rollback();
+                        }
+                    }
+                    else
+                        cache.getAll(keys);
                 }
             }
         }, NODES * 2, "get-thread");


[09/10] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5932

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/031928e9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/031928e9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/031928e9

Branch: refs/heads/ignite-5932
Commit: 031928e9f1ab1469370f500e57a4b88c0eb8580a
Parents: d6670e8 f29d4bc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:45:19 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:45:19 2017 +0300

----------------------------------------------------------------------
 .../internal/MarshallerMappingFileStore.java    |   15 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |   12 +-
 .../service/GridServiceProcessor.java           |   11 +-
 .../nio/GridAbstractCommunicationClient.java    |    2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  378 +++---
 .../IgniteMarshallerCacheFSRestoreTest.java     |   71 +-
 .../Apache.Ignite.Core.csproj                   |    1 -
 .../Impl/Binary/Io/BinaryHeapStream.cs          | 1018 +++++++++++++-
 .../Impl/Binary/Io/BinaryStreamBase.cs          | 1249 ------------------
 9 files changed, 1247 insertions(+), 1510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/031928e9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------


[05/10] ignite git commit: IGNITE-5928 .NET: Get rid of BinaryStreamBase

Posted by sb...@apache.org.
IGNITE-5928 .NET: Get rid of BinaryStreamBase

This closes #2835


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b7c508
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b7c508
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b7c508

Branch: refs/heads/ignite-5932
Commit: b8b7c50865d37449616cc6dadeeaba84526945f4
Parents: 5490c7d
Author: Alexey Popov <ta...@gmail.com>
Authored: Thu Oct 12 12:44:00 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Oct 12 12:44:00 2017 +0300

----------------------------------------------------------------------
 .../Apache.Ignite.Core.csproj                   |    1 -
 .../Impl/Binary/Io/BinaryHeapStream.cs          | 1018 +++++++++++++-
 .../Impl/Binary/Io/BinaryStreamBase.cs          | 1249 ------------------
 3 files changed, 958 insertions(+), 1310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 58abd26..446208a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -431,7 +431,6 @@
     <Compile Include="Impl\Messaging\Messaging.cs" />
     <Compile Include="Impl\NativeMethods.cs" />
     <Compile Include="Impl\Binary\IO\IBinaryStream.cs" />
-    <Compile Include="Impl\Binary\IO\BinaryStreamBase.cs" />
     <Compile Include="Impl\Binary\IO\BinaryHeapStream.cs" />
     <Compile Include="Impl\Binary\IBinaryTypeDescriptor.cs" />
     <Compile Include="Impl\Binary\IBinaryWriteAware.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a14da0a..a6082f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -22,12 +22,28 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
     using System.Text;
+    using Apache.Ignite.Core.Impl.Memory;
 
     /// <summary>
     /// Binary onheap stream.
     /// </summary>
-    internal unsafe class BinaryHeapStream : BinaryStreamBase
+    internal unsafe class BinaryHeapStream : IBinaryStream
     {
+        /** Byte: zero. */
+        private const byte ByteZero = 0;
+
+        /** Byte: one. */
+        private const byte ByteOne = 1;
+
+        /** LITTLE_ENDIAN flag. */
+        private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
+
+        /** Position. */
+        private int _pos;
+
+        /** Disposed flag. */
+        private bool _disposed;
+
         /** Data array. */
         private byte[] _data;
 
@@ -53,8 +69,903 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             _data = data;
         }
 
+        /// <summary>
+        /// Internal routine to write byte array.
+        /// </summary>
+        /// <param name="val">Byte array.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteByteArray0(byte[] val, byte* data)
+        {
+            fixed (byte* val0 = val)
+            {
+                CopyMemory(val0, data, val.Length);
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read byte array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Byte array</returns>
+        private static byte[] ReadByteArray0(int len, byte* data)
+        {
+            byte[] res = new byte[len];
+
+            fixed (byte* res0 = res)
+            {
+                CopyMemory(data, res0, len);
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteBool(bool val)
+        {
+            WriteByte(val ? ByteOne : ByteZero);
+        }
+
+        /** <inheritdoc /> */
+        public bool ReadBool()
+        {
+            return ReadByte() == ByteOne;
+        }
+
+        /// <summary>
+        /// Internal routine to write bool array.
+        /// </summary>
+        /// <param name="val">Bool array.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteBoolArray0(bool[] val, byte* data)
+        {
+            fixed (bool* val0 = val)
+            {
+                CopyMemory((byte*)val0, data, val.Length);
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read bool array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Bool array</returns>
+        private static bool[] ReadBoolArray0(int len, byte* data)
+        {
+            bool[] res = new bool[len];
+
+            fixed (bool* res0 = res)
+            {
+                CopyMemory(data, (byte*)res0, len);
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write short value.
+        /// </summary>
+        /// <param name="val">Short value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteShort0(short val, byte* data)
+        {
+            if (LittleEndian)
+                *((short*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[1];
+                data[1] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read short value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Short value</returns>
+        private static short ReadShort0(byte* data)
+        {
+            short val;
+
+            if (LittleEndian)
+                val = *((short*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[1];
+                valPtr[1] = data[0];
+            }
+
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write short array.
+        /// </summary>
+        /// <param name="val">Short array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteShortArray0(short[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (short* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    short val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+                    
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read short array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Short array</returns>
+        private static short[] ReadShortArray0(int len, byte* data, int cnt)
+        {
+            short[] res = new short[len];
+
+            if (LittleEndian)
+            {
+                fixed (short* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    short val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteChar(char val)
+        {
+            WriteShort(*(short*)(&val));
+        }
+
+        /** <inheritdoc /> */
+        public char ReadChar()
+        {
+            short val = ReadShort();
+
+            return *(char*)(&val);
+        }
+
+        /// <summary>
+        /// Internal routine to write char array.
+        /// </summary>
+        /// <param name="val">Char array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteCharArray0(char[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (char* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    char val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read char array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Char array</returns>
+        private static char[] ReadCharArray0(int len, byte* data, int cnt)
+        {
+            char[] res = new char[len];
+
+            if (LittleEndian)
+            {
+                fixed (char* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    char val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write int value.
+        /// </summary>
+        /// <param name="val">Int value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteInt0(int val, byte* data)
+        {
+            if (LittleEndian)
+                *((int*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[3];
+                data[1] = valPtr[2];
+                data[2] = valPtr[1];
+                data[3] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read int value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Int value</returns>
+        private static int ReadInt0(byte* data) {
+            int val;
+
+            if (LittleEndian)
+                val = *((int*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[3];
+                valPtr[1] = data[2];
+                valPtr[2] = data[1];
+                valPtr[3] = data[0];
+            }
+            
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write int array.
+        /// </summary>
+        /// <param name="val">Int array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteIntArray0(int[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (int* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    int val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read int array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Int array</returns>
+        private static int[] ReadIntArray0(int len, byte* data, int cnt)
+        {
+            int[] res = new int[len];
+
+            if (LittleEndian)
+            {
+                fixed (int* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    int val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteFloat(float val)
+        {
+            int val0 = *(int*)(&val);
+
+            WriteInt(val0);
+        }
+
+        /** <inheritdoc /> */
+        public float ReadFloat()
+        {
+            int val = ReadInt();
+
+            return BinaryUtils.IntToFloatBits(val);
+        }
+
+        /// <summary>
+        /// Internal routine to write float array.
+        /// </summary>
+        /// <param name="val">Int array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteFloatArray0(float[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (float* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    float val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read float array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Float array</returns>
+        private static float[] ReadFloatArray0(int len, byte* data, int cnt)
+        {
+            float[] res = new float[len];
+
+            if (LittleEndian)
+            {
+                fixed (float* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    int val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write long value.
+        /// </summary>
+        /// <param name="val">Long value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteLong0(long val, byte* data)
+        {
+            if (LittleEndian)
+                *((long*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[7];
+                data[1] = valPtr[6];
+                data[2] = valPtr[5];
+                data[3] = valPtr[4];
+                data[4] = valPtr[3];
+                data[5] = valPtr[2];
+                data[6] = valPtr[1];
+                data[7] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read long value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Long value</returns>
+        private static long ReadLong0(byte* data)
+        {
+            long val;
+
+            if (LittleEndian)
+                val = *((long*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[7];
+                valPtr[1] = data[6];
+                valPtr[2] = data[5];
+                valPtr[3] = data[4];
+                valPtr[4] = data[3];
+                valPtr[5] = data[2];
+                valPtr[6] = data[1];
+                valPtr[7] = data[0];
+            }
+
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write long array.
+        /// </summary>
+        /// <param name="val">Long array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteLongArray0(long[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (long* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    long val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[7];
+                    *curPos++ = valPtr[6];
+                    *curPos++ = valPtr[5];
+                    *curPos++ = valPtr[4];
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read long array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Long array</returns>
+        private static long[] ReadLongArray0(int len, byte* data, int cnt)
+        {
+            long[] res = new long[len];
+
+            if (LittleEndian)
+            {
+                fixed (long* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    long val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[7] = *data++;
+                    valPtr[6] = *data++;
+                    valPtr[5] = *data++;
+                    valPtr[4] = *data++;
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteDouble(double val)
+        {
+            long val0 = *(long*)(&val);
+
+            WriteLong(val0);
+        }
+
+        /** <inheritdoc /> */
+        public double ReadDouble()
+        {
+            long val = ReadLong();
+
+            return BinaryUtils.LongToDoubleBits(val);
+        }
+
+        /// <summary>
+        /// Internal routine to write double array.
+        /// </summary>
+        /// <param name="val">Double array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteDoubleArray0(double[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (double* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    double val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[7];
+                    *curPos++ = valPtr[6];
+                    *curPos++ = valPtr[5];
+                    *curPos++ = valPtr[4];
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read double array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Double array</returns>
+        private static double[] ReadDoubleArray0(int len, byte* data, int cnt)
+        {
+            double[] res = new double[len];
+
+            if (LittleEndian)
+            {
+                fixed (double* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    double val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[7] = *data++;
+                    valPtr[6] = *data++;
+                    valPtr[5] = *data++;
+                    valPtr[4] = *data++;
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void Write(byte[] src, int off, int cnt)
+        {
+            fixed (byte* src0 = src)
+            {
+                Write(src0 + off, cnt);
+            }
+        }
+
+        /** <inheritdoc /> */
+        public void Read(byte[] dest, int off, int cnt)
+        {
+            fixed (byte* dest0 = dest)
+            {
+                Read(dest0 + off, cnt);
+            }
+        }
+
+        /// <summary>
+        /// Internal write routine.
+        /// </summary>
+        /// <param name="src">Source.</param>
+        /// <param name="cnt">Count.</param>
+        /// <param name="data">Data (dsetination).</param>
+        private void WriteInternal(byte* src, int cnt, byte* data)
+        {
+            CopyMemory(src, data + _pos, cnt);
+        }
+
+        /// <summary>
+        /// Internal read routine.
+        /// </summary>
+        /// <param name="src">Source</param>
+        /// <param name="dest">Destination.</param>
+        /// <param name="cnt">Count.</param>
+        /// <returns>Amount of bytes written.</returns>
+        private void ReadInternal(byte* src, byte* dest, int cnt)
+        {
+            int cnt0 = Math.Min(Remaining, cnt);
+
+            CopyMemory(src + _pos, dest, cnt0);
+
+            ShiftRead(cnt0);
+        }
+
+        /** <inheritdoc /> */
+        public int Position
+        {
+            get { return _pos; }
+        }
+
         /** <inheritdoc /> */
-        public override void WriteByte(byte val)
+        public int Remaining
+        {
+            get { return _data.Length - _pos; }
+        }
+
+        /// <summary>
+        /// Internal array.
+        /// </summary>
+        internal byte[] InternalArray
+        {
+            get { return _data; }
+        }
+
+        /// <inheritdoc />
+        /// <exception cref="T:System.ArgumentException">
+        /// Unsupported seek origin:  + origin
+        /// or
+        /// Seek before origin:  + newPos
+        /// </exception>
+        public int Seek(int offset, SeekOrigin origin)
+        {
+            int newPos;
+
+            switch (origin)
+            {
+                case SeekOrigin.Begin:
+                    {
+                        newPos = offset;
+
+                        break;
+                    }
+
+                case SeekOrigin.Current:
+                    {
+                        newPos = _pos + offset;
+
+                        break;
+                    }
+
+                default:
+                    throw new ArgumentException("Unsupported seek origin: " + origin);
+            }
+
+            if (newPos < 0)
+                throw new ArgumentException("Seek before origin: " + newPos);
+
+            EnsureWriteCapacity(newPos);
+
+            _pos = newPos;
+
+            return _pos;
+        }
+
+        /** <inheritdoc /> */
+        public void Flush()
+        {
+            // No-op.
+        }
+
+        /** <inheritdoc /> */
+        public void Dispose()
+        {
+            if (_disposed)
+                return;
+
+            GC.SuppressFinalize(this);
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Ensure capacity for write and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private int EnsureWriteCapacityAndShift(int cnt)
+        {
+            int pos0 = _pos;
+
+            EnsureWriteCapacity(_pos + cnt);
+
+            ShiftWrite(cnt);
+
+            return pos0;
+        }
+
+        /// <summary>
+        /// Ensure capacity for read and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private int EnsureReadCapacityAndShift(int cnt)
+        {
+            int pos0 = _pos;
+
+            EnsureReadCapacity(cnt);
+
+            ShiftRead(cnt);
+
+            return pos0;
+        }
+
+        /// <summary>
+        /// Shift position due to write
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        private void ShiftWrite(int cnt)
+        {
+            _pos += cnt;
+        }
+
+        /// <summary>
+        /// Shift position due to read.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        private void ShiftRead(int cnt)
+        {
+            _pos += cnt;
+        }
+
+        /// <summary>
+        /// Calculate new capacity.
+        /// </summary>
+        /// <param name="curCap">Current capacity.</param>
+        /// <param name="reqCap">Required capacity.</param>
+        /// <returns>New capacity.</returns>
+        private static int Capacity(int curCap, int reqCap)
+        {
+            int newCap;
+
+            if (reqCap < 256)
+                newCap = 256;
+            else
+            {
+                newCap = curCap << 1;
+
+                if (newCap < reqCap)
+                    newCap = reqCap;
+            }
+
+            return newCap;
+        }
+
+        /// <summary>
+        /// Unsafe memory copy routine.
+        /// </summary>
+        /// <param name="src">Source.</param>
+        /// <param name="dest">Destination.</param>
+        /// <param name="len">Length.</param>
+        private static void CopyMemory(byte* src, byte* dest, int len)
+        {
+            PlatformMemoryUtils.CopyMemory(src, dest, len);
+        }
+
+        /** <inheritdoc /> */
+        public void WriteByte(byte val)
         {
             int pos0 = EnsureWriteCapacityAndShift(1);
 
@@ -62,7 +973,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override byte ReadByte()
+        public byte ReadByte()
         {
             int pos0 = EnsureReadCapacityAndShift(1);
 
@@ -71,7 +982,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteByteArray(byte[] val)
+        public void WriteByteArray(byte[] val)
         {
             int pos0 = EnsureWriteCapacityAndShift(val.Length);
 
@@ -82,7 +993,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override byte[] ReadByteArray(int cnt)
+        public byte[] ReadByteArray(int cnt)
         {
             int pos0 = EnsureReadCapacityAndShift(cnt);
 
@@ -94,7 +1005,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteBoolArray(bool[] val)
+        public void WriteBoolArray(bool[] val)
         {
             int pos0 = EnsureWriteCapacityAndShift(val.Length);
 
@@ -105,7 +1016,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override bool[] ReadBoolArray(int cnt)
+        public bool[] ReadBoolArray(int cnt)
         {
             int pos0 = EnsureReadCapacityAndShift(cnt);
 
@@ -116,7 +1027,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteShort(short val)
+        public void WriteShort(short val)
         {
             int pos0 = EnsureWriteCapacityAndShift(2);
 
@@ -127,7 +1038,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override short ReadShort()
+        public short ReadShort()
         {
             int pos0 = EnsureReadCapacityAndShift(2);
 
@@ -139,7 +1050,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteShortArray(short[] val)
+        public void WriteShortArray(short[] val)
         {
             int cnt = val.Length << 1;
 
@@ -152,7 +1063,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override short[] ReadShortArray(int cnt)
+        public short[] ReadShortArray(int cnt)
         {
             int cnt0 = cnt << 1;
 
@@ -166,7 +1077,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteCharArray(char[] val)
+        public void WriteCharArray(char[] val)
         {
             int cnt = val.Length << 1;
 
@@ -179,7 +1090,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override char[] ReadCharArray(int cnt)
+        public char[] ReadCharArray(int cnt)
         {
             int cnt0 = cnt << 1;
 
@@ -192,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteInt(int val)
+        public void WriteInt(int val)
         {
             int pos0 = EnsureWriteCapacityAndShift(4);
 
@@ -203,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteInt(int writePos, int val)
+        public void WriteInt(int writePos, int val)
         {
             EnsureWriteCapacity(writePos + 4);
 
@@ -214,7 +1125,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int ReadInt()
+        public int ReadInt()
         {
             int pos0 = EnsureReadCapacityAndShift(4);
 
@@ -226,7 +1137,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteIntArray(int[] val)
+        public void WriteIntArray(int[] val)
         {
             int cnt = val.Length << 2;
 
@@ -239,7 +1150,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int[] ReadIntArray(int cnt)
+        public int[] ReadIntArray(int cnt)
         {
             int cnt0 = cnt << 2;
 
@@ -253,7 +1164,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteFloatArray(float[] val)
+        public void WriteFloatArray(float[] val)
         {
             int cnt = val.Length << 2;
 
@@ -266,7 +1177,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override float[] ReadFloatArray(int cnt)
+        public float[] ReadFloatArray(int cnt)
         {
             int cnt0 = cnt << 2;
 
@@ -279,7 +1190,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteLong(long val)
+        public void WriteLong(long val)
         {
             int pos0 = EnsureWriteCapacityAndShift(8);
 
@@ -290,7 +1201,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override long ReadLong()
+        public long ReadLong()
         {
             int pos0 = EnsureReadCapacityAndShift(8);
 
@@ -302,7 +1213,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteLongArray(long[] val)
+        public void WriteLongArray(long[] val)
         {
             int cnt = val.Length << 3;
 
@@ -315,7 +1226,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override long[] ReadLongArray(int cnt)
+        public long[] ReadLongArray(int cnt)
         {
             int cnt0 = cnt << 3;
 
@@ -329,7 +1240,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteDoubleArray(double[] val)
+        public void WriteDoubleArray(double[] val)
         {
             int cnt = val.Length << 3;
 
@@ -342,7 +1253,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override double[] ReadDoubleArray(int cnt)
+        public double[] ReadDoubleArray(int cnt)
         {
             int cnt0 = cnt << 3;
 
@@ -355,7 +1266,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
+        public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
         {
             int pos0 = EnsureWriteCapacityAndShift(byteCnt);
 
@@ -370,9 +1281,9 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void Write(byte* src, int cnt)
+        public void Write(byte* src, int cnt)
         {
-            EnsureWriteCapacity(Pos + cnt);
+            EnsureWriteCapacity(_pos + cnt);
 
             fixed (byte* data0 = _data)
             {
@@ -383,7 +1294,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void Read(byte* dest, int cnt)
+        public void Read(byte* dest, int cnt)
         {
             fixed (byte* data0 = _data)
             {
@@ -392,36 +1303,30 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int Remaining
-        {
-            get { return _data.Length - Pos; }
-        }
-
-        /** <inheritdoc /> */
-        public override byte[] GetArray()
+        public byte[] GetArray()
         {
             return _data;
         }
 
         /** <inheritdoc /> */
-        public override byte[] GetArrayCopy()
+        public byte[] GetArrayCopy()
         {
-            byte[] copy = new byte[Pos];
+            byte[] copy = new byte[_pos];
 
-            Buffer.BlockCopy(_data, 0, copy, 0, Pos);
+            Buffer.BlockCopy(_data, 0, copy, 0, _pos);
 
             return copy;
         }
 
         /** <inheritdoc /> */
-        public override bool IsSameArray(byte[] arr)
+        public bool IsSameArray(byte[] arr)
         {
             return _data == arr;
         }
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
+        public T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
         {
             Debug.Assert(proc != null);
 
@@ -431,22 +1336,11 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             }
         }
 
-        /** <inheritdoc /> */
-        protected override void Dispose(bool disposing)
-        {
-            // No-op.
-        }
-
         /// <summary>
-        /// Internal array.
+        /// Ensure capacity for write.
         /// </summary>
-        internal byte[] InternalArray
-        {
-            get { return _data; }
-        }
-
-        /** <inheritdoc /> */
-        protected override void EnsureWriteCapacity(int cnt)
+        /// <param name="cnt">Bytes count.</param>
+        private void EnsureWriteCapacity(int cnt)
         {
             if (cnt > _data.Length)
             {
@@ -462,12 +1356,16 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             }
         }
 
-        /** <inheritdoc /> */
-        protected override void EnsureReadCapacity(int cnt)
+        /// <summary>
+        /// Ensure capacity for write and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private void EnsureReadCapacity(int cnt)
         {
-            if (_data.Length - Pos < cnt)
+            if (_data.Length - _pos < cnt)
                 throw new EndOfStreamException("Not enough data in stream [expected=" + cnt +
-                    ", remaining=" + (_data.Length - Pos) + ']');
+                                               ", remaining=" + (_data.Length - _pos) + ']');
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
deleted file mode 100644
index 0b855f8..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
+++ /dev/null
@@ -1,1249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary.IO
-{
-    using System;
-    using System.IO;
-    using System.Text;
-    using Apache.Ignite.Core.Impl.Memory;
-
-    /// <summary>
-    /// Base class for managed and unmanaged data streams.
-    /// </summary>
-    internal abstract unsafe class BinaryStreamBase : IBinaryStream
-    {
-        /** Byte: zero. */
-        private const byte ByteZero = 0;
-
-        /** Byte: one. */
-        private const byte ByteOne = 1;
-
-        /** LITTLE_ENDIAN flag. */
-        private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
-
-        /** Position. */
-        protected int Pos;
-
-        /** Disposed flag. */
-        private bool _disposed;
-
-        /// <summary>
-        /// Write byte.
-        /// </summary>
-        /// <param name="val">Byte value.</param>
-        public abstract void WriteByte(byte val);
-
-        /// <summary>
-        /// Read byte.
-        /// </summary>
-        /// <returns>
-        /// Byte value.
-        /// </returns>
-        public abstract byte ReadByte();
-
-        /// <summary>
-        /// Write byte array.
-        /// </summary>
-        /// <param name="val">Byte array.</param>
-        public abstract void WriteByteArray(byte[] val);
-
-        /// <summary>
-        /// Internal routine to write byte array.
-        /// </summary>
-        /// <param name="val">Byte array.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteByteArray0(byte[] val, byte* data)
-        {
-            fixed (byte* val0 = val)
-            {
-                CopyMemory(val0, data, val.Length);
-            }
-        }
-
-        /// <summary>
-        /// Read byte array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Byte array.
-        /// </returns>
-        public abstract byte[] ReadByteArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read byte array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Byte array</returns>
-        protected static byte[] ReadByteArray0(int len, byte* data)
-        {
-            byte[] res = new byte[len];
-
-            fixed (byte* res0 = res)
-            {
-                CopyMemory(data, res0, len);
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write bool.
-        /// </summary>
-        /// <param name="val">Bool value.</param>
-        public void WriteBool(bool val)
-        {
-            WriteByte(val ? ByteOne : ByteZero);
-        }
-
-        /// <summary>
-        /// Read bool.
-        /// </summary>
-        /// <returns>
-        /// Bool value.
-        /// </returns>
-        public bool ReadBool()
-        {
-            return ReadByte() == ByteOne;
-        }
-
-        /// <summary>
-        /// Write bool array.
-        /// </summary>
-        /// <param name="val">Bool array.</param>
-        public abstract void WriteBoolArray(bool[] val);
-
-        /// <summary>
-        /// Internal routine to write bool array.
-        /// </summary>
-        /// <param name="val">Bool array.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteBoolArray0(bool[] val, byte* data)
-        {
-            fixed (bool* val0 = val)
-            {
-                CopyMemory((byte*)val0, data, val.Length);
-            }
-        }
-
-        /// <summary>
-        /// Read bool array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Bool array.
-        /// </returns>
-        public abstract bool[] ReadBoolArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read bool array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Bool array</returns>
-        protected static bool[] ReadBoolArray0(int len, byte* data)
-        {
-            bool[] res = new bool[len];
-
-            fixed (bool* res0 = res)
-            {
-                CopyMemory(data, (byte*)res0, len);
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write short.
-        /// </summary>
-        /// <param name="val">Short value.</param>
-        public abstract void WriteShort(short val);
-
-        /// <summary>
-        /// Internal routine to write short value.
-        /// </summary>
-        /// <param name="val">Short value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteShort0(short val, byte* data)
-        {
-            if (LittleEndian)
-                *((short*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[1];
-                data[1] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read short.
-        /// </summary>
-        /// <returns>
-        /// Short value.
-        /// </returns>
-        public abstract short ReadShort();
-
-        /// <summary>
-        /// Internal routine to read short value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Short value</returns>
-        protected static short ReadShort0(byte* data)
-        {
-            short val;
-
-            if (LittleEndian)
-                val = *((short*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[1];
-                valPtr[1] = data[0];
-            }
-
-            return val;
-        }
-
-        /// <summary>
-        /// Write short array.
-        /// </summary>
-        /// <param name="val">Short array.</param>
-        public abstract void WriteShortArray(short[] val);
-
-        /// <summary>
-        /// Internal routine to write short array.
-        /// </summary>
-        /// <param name="val">Short array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteShortArray0(short[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (short* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    short val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-                    
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read short array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Short array.
-        /// </returns>
-        public abstract short[] ReadShortArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read short array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Short array</returns>
-        protected static short[] ReadShortArray0(int len, byte* data, int cnt)
-        {
-            short[] res = new short[len];
-
-            if (LittleEndian)
-            {
-                fixed (short* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    short val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write char.
-        /// </summary>
-        /// <param name="val">Char value.</param>
-        public void WriteChar(char val)
-        {
-            WriteShort(*(short*)(&val));
-        }
-
-        /// <summary>
-        /// Read char.
-        /// </summary>
-        /// <returns>
-        /// Char value.
-        /// </returns>
-        public char ReadChar()
-        {
-            short val = ReadShort();
-
-            return *(char*)(&val);
-        }
-
-        /// <summary>
-        /// Write char array.
-        /// </summary>
-        /// <param name="val">Char array.</param>
-        public abstract void WriteCharArray(char[] val);
-
-        /// <summary>
-        /// Internal routine to write char array.
-        /// </summary>
-        /// <param name="val">Char array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteCharArray0(char[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (char* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    char val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read char array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Char array.
-        /// </returns>
-        public abstract char[] ReadCharArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read char array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Char array</returns>
-        protected static char[] ReadCharArray0(int len, byte* data, int cnt)
-        {
-            char[] res = new char[len];
-
-            if (LittleEndian)
-            {
-                fixed (char* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    char val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write int.
-        /// </summary>
-        /// <param name="val">Int value.</param>
-        public abstract void WriteInt(int val);
-
-        /// <summary>
-        /// Write int to specific position.
-        /// </summary>
-        /// <param name="writePos">Position.</param>
-        /// <param name="val">Value.</param>
-        public abstract void WriteInt(int writePos, int val);
-
-        /// <summary>
-        /// Internal routine to write int value.
-        /// </summary>
-        /// <param name="val">Int value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteInt0(int val, byte* data)
-        {
-            if (LittleEndian)
-                *((int*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[3];
-                data[1] = valPtr[2];
-                data[2] = valPtr[1];
-                data[3] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read int.
-        /// </summary>
-        /// <returns>
-        /// Int value.
-        /// </returns>
-        public abstract int ReadInt();
-
-        /// <summary>
-        /// Internal routine to read int value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Int value</returns>
-        protected static int ReadInt0(byte* data) {
-            int val;
-
-            if (LittleEndian)
-                val = *((int*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[3];
-                valPtr[1] = data[2];
-                valPtr[2] = data[1];
-                valPtr[3] = data[0];
-            }
-            
-            return val;
-        }
-
-        /// <summary>
-        /// Write int array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        public abstract void WriteIntArray(int[] val);
-
-        /// <summary>
-        /// Internal routine to write int array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteIntArray0(int[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (int* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    int val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read int array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Int array.
-        /// </returns>
-        public abstract int[] ReadIntArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read int array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Int array</returns>
-        protected static int[] ReadIntArray0(int len, byte* data, int cnt)
-        {
-            int[] res = new int[len];
-
-            if (LittleEndian)
-            {
-                fixed (int* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    int val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write float.
-        /// </summary>
-        /// <param name="val">Float value.</param>
-        public void WriteFloat(float val)
-        {
-            int val0 = *(int*)(&val);
-
-            WriteInt(val0);
-        }
-
-        /// <summary>
-        /// Read float.
-        /// </summary>
-        /// <returns>
-        /// Float value.
-        /// </returns>
-        public float ReadFloat()
-        {
-            int val = ReadInt();
-
-            return BinaryUtils.IntToFloatBits(val);
-        }
-
-        /// <summary>
-        /// Write float array.
-        /// </summary>
-        /// <param name="val">Float array.</param>
-        public abstract void WriteFloatArray(float[] val);
-
-        /// <summary>
-        /// Internal routine to write float array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteFloatArray0(float[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (float* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    float val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read float array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Float array.
-        /// </returns>
-        public abstract float[] ReadFloatArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read float array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Float array</returns>
-        protected static float[] ReadFloatArray0(int len, byte* data, int cnt)
-        {
-            float[] res = new float[len];
-
-            if (LittleEndian)
-            {
-                fixed (float* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    int val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write long.
-        /// </summary>
-        /// <param name="val">Long value.</param>
-        public abstract void WriteLong(long val);
-
-        /// <summary>
-        /// Internal routine to write long value.
-        /// </summary>
-        /// <param name="val">Long value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteLong0(long val, byte* data)
-        {
-            if (LittleEndian)
-                *((long*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[7];
-                data[1] = valPtr[6];
-                data[2] = valPtr[5];
-                data[3] = valPtr[4];
-                data[4] = valPtr[3];
-                data[5] = valPtr[2];
-                data[6] = valPtr[1];
-                data[7] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read long.
-        /// </summary>
-        /// <returns>
-        /// Long value.
-        /// </returns>
-        public abstract long ReadLong();
-
-        /// <summary>
-        /// Internal routine to read long value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Long value</returns>
-        protected static long ReadLong0(byte* data)
-        {
-            long val;
-
-            if (LittleEndian)
-                val = *((long*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[7];
-                valPtr[1] = data[6];
-                valPtr[2] = data[5];
-                valPtr[3] = data[4];
-                valPtr[4] = data[3];
-                valPtr[5] = data[2];
-                valPtr[6] = data[1];
-                valPtr[7] = data[0];
-            }
-
-            return val;
-        }
-
-        /// <summary>
-        /// Write long array.
-        /// </summary>
-        /// <param name="val">Long array.</param>
-        public abstract void WriteLongArray(long[] val);
-
-        /// <summary>
-        /// Internal routine to write long array.
-        /// </summary>
-        /// <param name="val">Long array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteLongArray0(long[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (long* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    long val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[7];
-                    *curPos++ = valPtr[6];
-                    *curPos++ = valPtr[5];
-                    *curPos++ = valPtr[4];
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read long array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Long array.
-        /// </returns>
-        public abstract long[] ReadLongArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read long array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Long array</returns>
-        protected static long[] ReadLongArray0(int len, byte* data, int cnt)
-        {
-            long[] res = new long[len];
-
-            if (LittleEndian)
-            {
-                fixed (long* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    long val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[7] = *data++;
-                    valPtr[6] = *data++;
-                    valPtr[5] = *data++;
-                    valPtr[4] = *data++;
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write double.
-        /// </summary>
-        /// <param name="val">Double value.</param>
-        public void WriteDouble(double val)
-        {
-            long val0 = *(long*)(&val);
-
-            WriteLong(val0);
-        }
-
-        /// <summary>
-        /// Read double.
-        /// </summary>
-        /// <returns>
-        /// Double value.
-        /// </returns>
-        public double ReadDouble()
-        {
-            long val = ReadLong();
-
-            return BinaryUtils.LongToDoubleBits(val);
-        }
-
-        /// <summary>
-        /// Write double array.
-        /// </summary>
-        /// <param name="val">Double array.</param>
-        public abstract void WriteDoubleArray(double[] val);
-
-        /// <summary>
-        /// Internal routine to write double array.
-        /// </summary>
-        /// <param name="val">Double array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteDoubleArray0(double[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (double* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    double val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[7];
-                    *curPos++ = valPtr[6];
-                    *curPos++ = valPtr[5];
-                    *curPos++ = valPtr[4];
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read double array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Double array.
-        /// </returns>
-        public abstract double[] ReadDoubleArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read double array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Double array</returns>
-        protected static double[] ReadDoubleArray0(int len, byte* data, int cnt)
-        {
-            double[] res = new double[len];
-
-            if (LittleEndian)
-            {
-                fixed (double* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    double val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[7] = *data++;
-                    valPtr[6] = *data++;
-                    valPtr[5] = *data++;
-                    valPtr[4] = *data++;
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write string.
-        /// </summary>
-        /// <param name="chars">Characters.</param>
-        /// <param name="charCnt">Char count.</param>
-        /// <param name="byteCnt">Byte count.</param>
-        /// <param name="encoding">Encoding.</param>
-        /// <returns>
-        /// Amounts of bytes written.
-        /// </returns>
-        public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding);
-
-        /// <summary>
-        /// Write arbitrary data.
-        /// </summary>
-        /// <param name="src">Source array.</param>
-        /// <param name="off">Offset</param>
-        /// <param name="cnt">Count.</param>
-        public void Write(byte[] src, int off, int cnt)
-        {
-            fixed (byte* src0 = src)
-            {
-                Write(src0 + off, cnt);
-            }
-        }
-
-        /// <summary>
-        /// Read arbitrary data.
-        /// </summary>
-        /// <param name="dest">Destination array.</param>
-        /// <param name="off">Offset.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Amount of bytes read.
-        /// </returns>
-        public void Read(byte[] dest, int off, int cnt)
-        {
-            fixed (byte* dest0 = dest)
-            {
-                Read(dest0 + off, cnt);
-            }
-        }
-
-        /// <summary>
-        /// Write arbitrary data.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="cnt">Count.</param>
-        public abstract void Write(byte* src, int cnt);
-
-        /// <summary>
-        /// Internal write routine.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="cnt">Count.</param>
-        /// <param name="data">Data (dsetination).</param>
-        protected void WriteInternal(byte* src, int cnt, byte* data)
-        {
-            CopyMemory(src, data + Pos, cnt);
-        }
-
-        /// <summary>
-        /// Read arbitrary data.
-        /// </summary>
-        /// <param name="dest">Destination.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns></returns>
-        public abstract void Read(byte* dest, int cnt);
-
-        /// <summary>
-        /// Internal read routine.
-        /// </summary>
-        /// <param name="src">Source</param>
-        /// <param name="dest">Destination.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns>Amount of bytes written.</returns>
-        protected void ReadInternal(byte* src, byte* dest, int cnt)
-        {
-            int cnt0 = Math.Min(Remaining, cnt);
-
-            CopyMemory(src + Pos, dest, cnt0);
-
-            ShiftRead(cnt0);
-        }
-
-        /// <summary>
-        /// Position.
-        /// </summary>
-        public int Position
-        {
-            get { return Pos; }
-        }
-
-        /// <summary>
-        /// Gets remaining bytes in the stream.
-        /// </summary>
-        /// <value>
-        ///     Remaining bytes.
-        /// </value>
-        public abstract int Remaining { get; }
-
-        /// <summary>
-        /// Gets underlying array, avoiding copying if possible.
-        /// </summary>
-        /// <returns>
-        /// Underlying array.
-        /// </returns>
-        public abstract byte[] GetArray();
-
-        /// <summary>
-        /// Gets underlying data in a new array.
-        /// </summary>
-        /// <returns>
-        /// New array with data.
-        /// </returns>
-        public abstract byte[] GetArrayCopy();
-
-        /// <summary>
-        /// Check whether array passed as argument is the same as the stream hosts.
-        /// </summary>
-        /// <param name="arr">Array.</param>
-        /// <returns>
-        ///   <c>True</c> if they are same.
-        /// </returns>
-        public abstract bool IsSameArray(byte[] arr);
-        
-        /// <summary>
-        /// Seek to the given position.
-        /// </summary>
-        /// <param name="offset">Offset.</param>
-        /// <param name="origin">Seek origin.</param>
-        /// <returns>
-        /// Position.
-        /// </returns>
-        /// <exception cref="System.ArgumentException">
-        /// Unsupported seek origin:  + origin
-        /// or
-        /// Seek before origin:  + newPos
-        /// </exception>
-        public int Seek(int offset, SeekOrigin origin)
-        {
-            int newPos;
-
-            switch (origin)
-            {
-                case SeekOrigin.Begin:
-                    {
-                        newPos = offset;
-
-                        break;
-                    }
-
-                case SeekOrigin.Current:
-                    {
-                        newPos = Pos + offset;
-
-                        break;
-                    }
-
-                default:
-                    throw new ArgumentException("Unsupported seek origin: " + origin);
-            }
-
-            if (newPos < 0)
-                throw new ArgumentException("Seek before origin: " + newPos);
-
-            EnsureWriteCapacity(newPos);
-
-            Pos = newPos;
-
-            return Pos;
-        }
-
-        /// <summary>
-        /// Returns a hash code for the specified byte range.
-        /// </summary>
-        public abstract T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg);
-
-        /// <summary>
-        /// Flushes the data to underlying storage.
-        /// </summary>
-        public void Flush()
-        {
-            // No-op.
-        }
-
-        /** <inheritdoc /> */
-        public void Dispose()
-        {
-            if (_disposed)
-                return;
-
-            Dispose(true);
-
-            GC.SuppressFinalize(this);
-
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
-        /// </summary>
-        protected abstract void Dispose(bool disposing);
-
-        /// <summary>
-        /// Ensure capacity for write.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected abstract void EnsureWriteCapacity(int cnt);
-
-        /// <summary>
-        /// Ensure capacity for write and shift position.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Position before shift.</returns>
-        protected int EnsureWriteCapacityAndShift(int cnt)
-        {
-            int pos0 = Pos;
-
-            EnsureWriteCapacity(Pos + cnt);
-
-            ShiftWrite(cnt);
-
-            return pos0;
-        }
-
-        /// <summary>
-        /// Ensure capacity for read.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected abstract void EnsureReadCapacity(int cnt);
-
-        /// <summary>
-        /// Ensure capacity for read and shift position.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Position before shift.</returns>
-        protected int EnsureReadCapacityAndShift(int cnt)
-        {
-            int pos0 = Pos;
-
-            EnsureReadCapacity(cnt);
-
-            ShiftRead(cnt);
-
-            return pos0;
-        }
-
-        /// <summary>
-        /// Shift position due to write
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected void ShiftWrite(int cnt)
-        {
-            Pos += cnt;
-        }
-
-        /// <summary>
-        /// Shift position due to read.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        private void ShiftRead(int cnt)
-        {
-            Pos += cnt;
-        }
-
-        /// <summary>
-        /// Calculate new capacity.
-        /// </summary>
-        /// <param name="curCap">Current capacity.</param>
-        /// <param name="reqCap">Required capacity.</param>
-        /// <returns>New capacity.</returns>
-        protected static int Capacity(int curCap, int reqCap)
-        {
-            int newCap;
-
-            if (reqCap < 256)
-                newCap = 256;
-            else
-            {
-                newCap = curCap << 1;
-
-                if (newCap < reqCap)
-                    newCap = reqCap;
-            }
-
-            return newCap;
-        }
-
-        /// <summary>
-        /// Unsafe memory copy routine.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="dest">Destination.</param>
-        /// <param name="len">Length.</param>
-        private static void CopyMemory(byte* src, byte* dest, int len)
-        {
-            PlatformMemoryUtils.CopyMemory(src, dest, len);
-        }
-    }
-}


[04/10] ignite git commit: ignite-5932

Posted by sb...@apache.org.
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af887544
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af887544
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af887544

Branch: refs/heads/ignite-5932
Commit: af887544099dd3e2f427b74a029f601ffddb4471
Parents: 1780062
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 12:30:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 12:30:02 2017 +0300

----------------------------------------------------------------------
 .../dht/GridPartitionedGetFuture.java           | 15 ++++++++--
 .../dht/GridPartitionedSingleGetFuture.java     | 21 ++++++++++----
 .../dht/atomic/GridDhtAtomicCache.java          |  6 ++--
 .../dht/colocated/GridDhtColocatedCache.java    | 30 +++++++++++++-------
 .../cache/distributed/near/GridNearTxLocal.java | 23 ++++++++++-----
 5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7993d05..7689a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -78,6 +78,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     private static IgniteLogger log;
 
     /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
+    /** */
     private MvccQueryTracker mvccTracker;
 
     /**
@@ -94,6 +97,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing value and version.
      * @param keepCacheObjects Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
@@ -107,7 +111,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObjects
+        boolean keepCacheObjects,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         super(cctx,
             keys,
@@ -121,6 +126,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             needVer,
             keepCacheObjects,
             recovery);
+        assert mvccVer == null || cctx.mvccEnabled();
+
+        this.mvccVer = mvccVer;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
@@ -133,6 +141,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         if (!cctx.mvccEnabled())
             return null;
 
+        if (mvccVer != null)
+            return mvccVer;
+
         MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
 
         assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]";
@@ -158,7 +169,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
         }
 
-        if (cctx.mvccEnabled()) {
+        if (cctx.mvccEnabled() && mvccVer == null) {
             mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
 
             trackable = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b34687f..afef744 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,11 +41,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     @GridToStringInclude
     private ClusterNode node;
 
+    /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param key Key.
@@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObjects,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert key != null;
+        assert mvccVer == null || cctx.mvccEnabled();
 
         AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
@@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         this.keepCacheObjects = keepCacheObjects;
         this.recovery = recovery;
         this.topVer = topVer;
+        this.mvccVer = mvccVer;
 
         futId = IgniteUuid.randomUuid();
 
@@ -275,6 +282,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 cctx.mvcc().addFuture(this, futId);
             }
 
+            // TODO IGNITE-3478.
             GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
                 futId.localId(),
                 key,
@@ -355,7 +363,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(cctx, key); // TODO IGNITE-3478
+                    CacheDataRow row = mvccVer != null ? cctx.offheap().mvccRead(cctx, key, mvccVer) :
+                        cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -398,8 +407,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null,
-                                null); // TODO IGNITE-3478
+                                mvccVer,
+                                null);
 
                             if (res != null) {
                                 v = res.value();
@@ -418,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null); // TODO IGNITE-3478
+                                mvccVer);
                         }
 
                         colocated.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 16416cc..d6862fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             skipVals,
             needVer,
             false,
-            recovery);
+            recovery,
+            null);
 
         fut.init();
 
@@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             expiry,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7364cb3..c975edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             /*keepCacheObjects*/false,
-            opCtx != null && opCtx.recovery());
+            opCtx != null && opCtx.recovery(),
+            null);
 
         fut.init();
 
@@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param needVer Need version.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(
+    private IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean forcePrimary,
@@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
     }
 
     /**
@@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObj,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
             ctx.toCacheKeyObject(key),
@@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             keepCacheObj,
-            recovery);
+            recovery,
+            mvccVer);
 
         fut.init();
 
@@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing value and version.
      * @param keepCacheObj Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      * @return Load future.
      */
     public final IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObj
+        boolean keepCacheObj,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
+        assert mvccVer == null || ctx.mvccEnabled();
+
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
 
@@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc = expiryPolicy(null);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) {
+        if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || mvccVer != null)) {
             try {
                 Map<K, V> locVals = null;
 
@@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null,
+                                            mvccVer,
                                             null);
 
                                         if (getRes != null) {
@@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null);
+                                            mvccVer);
                                     }
 
                                     // Entry was not in memory or in swap, so we remove it from cache.
@@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            keepCacheObj);
+            keepCacheObj,
+            mvccVer);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c8dfc9f..08f20de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1659,6 +1659,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         }
     }
 
+    private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+        if (!cctx.mvccEnabled() || mvccTracker == null)
+            return null;
+
+        return mvccTracker.mvccVersion();
+    }
+
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys to get.
@@ -1830,8 +1837,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            null,
-                                            null); // TODO IGNITE-3478
+                                            null, // TODO IGNITE-3478
+                                            null);
 
                                         if (getRes != null) {
                                             val = getRes.value();
@@ -2214,8 +2221,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                         resolveTaskName(),
                                         accessPlc,
                                         !deserializeBinary,
-                                        null,
-                                        null) : null; // TODO IGNITE-3478
+                                        mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+                                        null) : null;
 
                                 if (getRes != null) {
                                     val = getRes.value();
@@ -2234,7 +2241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                     resolveTaskName(),
                                     accessPlc,
                                     !deserializeBinary,
-                                    null); // TODO IGNITE-3478
+                                    mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2572,7 +2579,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     skipVals,
                     needVer,
                     /*keepCacheObject*/true,
-                    recovery
+                    recovery,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Object>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Object> f) {
                         try {
@@ -2603,7 +2611,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     expiryPlc0,
                     skipVals,
                     needVer,
-                    /*keepCacheObject*/true
+                    /*keepCacheObject*/true,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
                         try {


[07/10] ignite git commit: ignite-3478 Fixed query ack

Posted by sb...@apache.org.
ignite-3478 Fixed query ack


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f29d4bc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f29d4bc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f29d4bc5

Branch: refs/heads/ignite-5932
Commit: f29d4bc50801c530ef856d168fb637b0fad1c27b
Parents: 2374296
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:07 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheCoordinatorsProcessor.java          | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f29d4bc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 9f9a7a3..85dde15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -261,7 +261,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
                 }
 
                 crd = crdNode != null ? new
-                    MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null;
+                    MvccCoordinator(crdNode.id(), coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null;
 
                 if (crd != null)
                     log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']');
@@ -274,6 +274,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param topVer Topology version.
+     * @return Coordinator version.
+     */
+    private long coordinatorVersion(long topVer) {
+        return topVer + ctx.discovery().gridStartTime();
+    }
+
+    /**
      * @param log Logger.
      */
     public void dumpStatistics(IgniteLogger log) {
@@ -1022,7 +1030,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
             ", topVer=" + topVer + ']');
 
-        crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
+        crdVer = coordinatorVersion(topVer.topologyVersion());
 
         prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
 


[08/10] ignite git commit: ignite-5932

Posted by sb...@apache.org.
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6670e8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6670e8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6670e8c

Branch: refs/heads/ignite-5932
Commit: d6670e8c80b225e29c04150f10a26c51de66b0f6
Parents: af88754
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:49 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  22 +-
 .../near/GridNearTxFinishAndAckFuture.java      |  16 +-
 .../cache/distributed/near/GridNearTxLocal.java |   4 +
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  95 ++++++---
 .../cache/mvcc/CoordinatorAckRequestQuery.java  | 130 ++++++++++++
 .../cache/mvcc/CoordinatorAckRequestTx.java     | 201 +++++++++++++++++++
 .../mvcc/CoordinatorAckRequestTxAndQuery.java   | 120 +++++++++++
 .../mvcc/CoordinatorAckRequestTxAndQueryEx.java | 144 +++++++++++++
 .../cache/mvcc/CoordinatorQueryAckRequest.java  | 130 ------------
 .../cache/mvcc/CoordinatorTxAckRequest.java     | 194 ------------------
 .../processors/cache/mvcc/MvccCounter.java      |   2 +-
 .../processors/cache/mvcc/MvccQueryTracker.java |  37 ++++
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  16 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   8 +-
 14 files changed, 747 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 99bc8af..6a59c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -103,10 +103,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
@@ -892,7 +894,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 131: // TODO IGNITE-3478 fix constants.
-                msg = new CoordinatorTxAckRequest();
+                msg = new CoordinatorAckRequestTx();
 
                 break;
 
@@ -907,7 +909,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 134:
-                msg = new CoordinatorQueryAckRequest();
+                msg = new CoordinatorAckRequestQuery();
 
                 break;
 
@@ -937,6 +939,16 @@ public class GridIoMessageFactory implements MessageFactory {
                 return msg;
 
             case 141:
+                msg = new CoordinatorAckRequestTxAndQuery();
+
+                return msg;
+
+            case 142:
+                msg = new CoordinatorAckRequestTxAndQueryEx();
+
+                return msg;
+
+            case 143:
                 msg = new MvccCounter();
 
                 return msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..5d8b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                 @Override public void apply(final GridNearTxFinishFuture fut) {
                     GridNearTxLocal tx = fut.tx();
 
+                    IgniteInternalFuture<Void> ackFut = null;
+
+                    MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
                     TxMvccInfo mvccInfo = tx.mvccInfo();
 
-                    if (mvccInfo != null) {
-                        IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
-                            mvccInfo.coordinator(), mvccInfo.version());
+                    if (qryTracker != null)
+                        ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+                    else if (mvccInfo != null) {
+                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+                            mvccInfo.version(),
+                            null);
+                    }
 
+                    if (ackFut != null) {
                         ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
                             @Override public void apply(IgniteInternalFuture<Void> ackFut) {
                                 Exception err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 08f20de..c774f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,6 +237,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
             trackTimeout = cctx.time().addTimeoutObject(this);
     }
 
+    MvccQueryTracker mvccQueryTracker() {
+        return mvccTracker;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean near() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b89ce73..59eae1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -176,7 +176,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
         statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
         statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
         statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
         statCntrs[4] = new StatCounter("TotalRequests");
         statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -331,20 +331,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
 
-        long trackCntr = mvccVer.counter();
-
-        MvccLongList txs = mvccVer.activeTransactions();
+        long trackCntr = queryTrackCounter(mvccVer);
 
-        if (txs != null) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
-
-                if (txId < trackCntr)
-                    trackCntr = txId;
-            }
-        }
-
-        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
             new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
 
         try {
@@ -363,6 +352,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param mvccVer Read version.
+     * @return
+     */
+    private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+        long trackCntr = mvccVer.counter();
+
+        MvccLongList txs = mvccVer.activeTransactions();
+
+        int size = txs.size();
+
+        for (int i = 0; i < size; i++) {
+            long txId = txs.get(i);
+
+            if (txId < trackCntr)
+                trackCntr = txId;
+        }
+
+        return trackCntr;
+    }
+
+    /**
      * @param crd Coordinator.
      * @return Counter request future.
      */
@@ -422,22 +432,42 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
     /**
      * @param crd Coordinator.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      * @return Acknowledge future.
      */
-    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+    public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer) {
         assert crd != null;
-        assert mvccVer != null;
+        assert updateVer != null;
 
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
 
         ackFuts.put(fut.id, fut);
 
+        MvccCoordinatorMessage msg;
+
+        if (readVer != null) {
+            long trackCntr = queryTrackCounter(readVer);
+
+            if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+                msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+                    updateVer.counter(),
+                    trackCntr);
+            }
+            else {
+                msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+                    updateVer.counter(),
+                    readVer.coordinatorVersion(),
+                    trackCntr);
+            }
+        }
+        else
+            msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
+
         try {
-            ctx.io().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
-                MSG_POLICY);
+            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
         }
         catch (IgniteCheckedException e) {
             if (ackFuts.remove(fut.id) != null) {
@@ -456,7 +486,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param mvccVer Transaction version.
      */
     public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+        CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
 
         msg.skipResponse(true);
 
@@ -568,7 +598,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
         onQueryDone(nodeId, msg.counter());
     }
 
@@ -577,16 +607,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param msg Message.
      */
     private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
-        prevCrdQueries.onQueryDone(nodeId, msg);
+        prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
     }
 
     /**
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
         onTxDone(msg.txCounter());
 
+        if (msg.queryCounter() != COUNTER_NA) {
+            if (msg.queryCoordinatorVersion() == 0)
+                onQueryDone(nodeId, msg.queryCounter());
+            else
+                prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+        }
+
         if (STAT_CNTRS)
             statCntrs[2].update();
 
@@ -1238,12 +1275,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
             if (msg instanceof CoordinatorTxCounterRequest)
                 processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxAckRequest)
-                processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestTx)
+                processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
             else if (msg instanceof CoordinatorFutureResponse)
                 processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
-            else if (msg instanceof CoordinatorQueryAckRequest)
-                processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestQuery)
+                processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Query counter.
+     */
+    CoordinatorAckRequestQuery(long cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 134;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..a3904fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long txCntr;
+
+    /** */
+    private byte flags;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction.
+     */
+    CoordinatorAckRequestTx(long futId, long txCntr) {
+        this.futId = futId;
+        this.txCntr = txCntr;
+    }
+
+    long queryCounter() {
+        return CacheCoordinatorsProcessor.COUNTER_NA;
+    }
+
+    long queryCoordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if response message is not needed.
+     */
+    boolean skipResponse() {
+        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param val {@code True} if response message is not needed.
+     */
+    void skipResponse(boolean val) {
+        if (val)
+            flags |= SKIP_RESPONSE_FLAG_MASK;
+        else
+            flags &= ~SKIP_RESPONSE_FLAG_MASK;
+    }
+
+    /**
+     * @return Counter assigned tp transaction.
+     */
+    public long txCounter() {
+        return txCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("txCntr", txCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                txCntr = reader.readLong("txCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 131;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..91f27b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 141;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..1808697
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+    /** */
+    private long qryCrdVer;
+
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQueryEx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCrdVer Version of coordinator assigned read counter.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCrdVer = qryCrdVer;
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCoordinatorVersion() {
+        return qryCrdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("qryCrdVer", qryCrdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                qryCrdVer = reader.readLong("qryCrdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 142;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorQueryAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Query counter.
-     */
-    CoordinatorQueryAckRequest(long cntr) {
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 134;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorQueryAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
-    /** */
-    private long futId;
-
-    /** */
-    private long txCntr;
-
-    /** */
-    private byte flags;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorTxAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     * @param txCntr Counter assigned to transaction.
-     */
-    CoordinatorTxAckRequest(long futId, long txCntr) {
-        this.futId = futId;
-        this.txCntr = txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return {@code True} if response message is not needed.
-     */
-    boolean skipResponse() {
-        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
-    }
-
-    /**
-     * @param val {@code True} if response message is not needed.
-     */
-    void skipResponse(boolean val) {
-        if (val)
-            flags |= SKIP_RESPONSE_FLAG_MASK;
-        else
-            flags &= ~SKIP_RESPONSE_FLAG_MASK;
-    }
-
-    /**
-     * @return Counter assigned tp transaction.
-     */
-    public long txCounter() {
-        return txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("txCntr", txCntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                txCntr = reader.readLong("txCntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorTxAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 131;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorTxAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..33407b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -143,7 +143,7 @@ public class MvccCounter implements Message {
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 141;
+        return 143;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 8c421fc..e45b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -115,6 +116,42 @@ public class MvccQueryTracker {
             cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
     }
 
+    public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+        MvccCoordinator mvccCrd0 = null;
+        MvccCoordinatorVersion mvccVer0 = null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null;
+
+                mvccCrd0 = mvccCrd;
+                mvccVer0 = mvccVer;
+
+                mvccVer = null; // Mark as finished.
+            }
+        }
+
+        if (mvccVer0 != null) {
+            if (mvccInfo == null) {
+                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+                return null;
+            }
+            else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
+                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
+            else {
+                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+            }
+        }
+
+        if (mvccInfo != null)
+            return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+
+        return null;
+    }
+
     /**
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..667865b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -158,23 +158,27 @@ class PreviousCoordinatorQueries {
 
     /**
      * @param nodeId Node ID.
-     * @param msg Message.
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
      */
-    void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+    void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+        assert crdVer != 0;
+        assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
         synchronized (this) {
-            MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+            MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
 
             Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
 
             if (nodeQueries == null)
                 activeQueries.put(nodeId, nodeQueries = new HashMap<>());
 
-            Integer qryCnt = nodeQueries.get(cntr);
+            Integer qryCnt = nodeQueries.get(mvccCntr);
 
             int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
 
             if (newQryCnt == 0) {
-                nodeQueries.remove(cntr);
+                nodeQueries.remove(mvccCntr);
 
                 if (nodeQueries.isEmpty()) {
                     activeQueries.remove(nodeId);
@@ -184,7 +188,7 @@ class PreviousCoordinatorQueries {
                 }
             }
             else
-                nodeQueries.put(cntr, newQryCnt);
+                nodeQueries.put(mvccCntr, newQryCnt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 82201ea..8964cd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -655,7 +655,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             boolean block = true;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (block && msg instanceof CoordinatorTxAckRequest) {
+                if (block && msg instanceof CoordinatorAckRequestTx) {
                     block = false;
 
                     return true;
@@ -991,7 +991,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
             @Override public void apply(ClusterNode node, Message msg) {
-                if (msg instanceof CoordinatorTxAckRequest)
+                if (msg instanceof CoordinatorAckRequestTx)
                     doSleep(2000);
             }
         });
@@ -1110,7 +1110,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             private boolean blocked;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+                if (!blocked && (msg instanceof CoordinatorAckRequestTx)) {
                     blocked = true;
 
                     return true;
@@ -2055,7 +2055,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1));
 
-        TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+        TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class,
             getTestIgniteInstanceName(0));
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {