You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 10:43:23 UTC

[1/6] ignite git commit: IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 970cf47a5 -> f29d4bc50


IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ffa1099
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ffa1099
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ffa1099

Branch: refs/heads/ignite-3478
Commit: 8ffa1099e2afd14052f7c91be822b2aa3f5f2a8d
Parents: 0f3546a
Author: AMRepo <an...@gmail.com>
Authored: Tue Oct 10 11:57:20 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Oct 11 12:00:26 2017 +0300

----------------------------------------------------------------------
 .../processors/service/GridServiceProcessor.java         | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8ffa1099/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 9272760..6f1dfc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -316,7 +316,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             Service svc = ctx.service();
 
             if (svc != null)
-                svc.cancel(ctx);
+                try {
+                    svc.cancel(ctx);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to cancel service (ignoring) [name=" + ctx.name() +
+                        ", execId=" + ctx.executionId() + ']', e);
+
+                    if (e instanceof Error)
+                        throw e;
+                }
 
             ctx.executor().shutdownNow();
         }


[6/6] ignite git commit: ignite-3478 Fixed query ack

Posted by sb...@apache.org.
ignite-3478 Fixed query ack


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f29d4bc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f29d4bc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f29d4bc5

Branch: refs/heads/ignite-3478
Commit: f29d4bc50801c530ef856d168fb637b0fad1c27b
Parents: 2374296
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:07 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheCoordinatorsProcessor.java          | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f29d4bc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 9f9a7a3..85dde15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -261,7 +261,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
                 }
 
                 crd = crdNode != null ? new
-                    MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null;
+                    MvccCoordinator(crdNode.id(), coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null;
 
                 if (crd != null)
                     log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']');
@@ -274,6 +274,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param topVer Topology version.
+     * @return Coordinator version.
+     */
+    private long coordinatorVersion(long topVer) {
+        return topVer + ctx.discovery().gridStartTime();
+    }
+
+    /**
      * @param log Logger.
      */
     public void dumpStatistics(IgniteLogger log) {
@@ -1022,7 +1030,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
             ", topVer=" + topVer + ']');
 
-        crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
+        crdVer = coordinatorVersion(topVer.topologyVersion());
 
         prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
 


[3/6] ignite git commit: IGNITE-6536 Node fails when detects mapping storage corruption

Posted by sb...@apache.org.
IGNITE-6536 Node fails when detects mapping storage corruption

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5490c7d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5490c7d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5490c7d9

Branch: refs/heads/ignite-3478
Commit: 5490c7d927daca2d64f108b6c2eafe03bbd6f54e
Parents: b0158fb
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Oct 11 15:33:23 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Oct 11 15:34:06 2017 +0300

----------------------------------------------------------------------
 .../internal/MarshallerMappingFileStore.java    | 15 +++--
 .../IgniteMarshallerCacheFSRestoreTest.java     | 71 +++++++++++++++++++-
 2 files changed, 77 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index eabbdb8..59a99b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -167,16 +167,19 @@ final class MarshallerMappingFileStore {
 
             try (FileInputStream in = new FileInputStream(file)) {
                 try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
-                    String className = reader.readLine();
+                    String clsName = reader.readLine();
 
-                    marshCtx.registerClassNameLocally(platformId, typeId, className);
+                    if (clsName == null) {
+                        throw new IgniteCheckedException("Class name is null for [platformId=" + platformId +
+                            ", typeId=" + typeId + "], marshaller mappings storage is broken. " +
+                            "Clean up marshaller directory (<work_dir>/marshaller) and restart the node.");
+                    }
+
+                    marshCtx.registerClassNameLocally(platformId, typeId, clsName);
                 }
             }
             catch (IOException e) {
-                throw new IgniteCheckedException("Reading marshaller mapping from file "
-                    + name
-                    + " failed."
-                    , e);
+                throw new IgniteCheckedException("Reading marshaller mapping from file " + name + " failed.", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 21a3e43..ac15971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -23,13 +23,16 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
@@ -47,6 +50,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
     /** */
     private volatile boolean isDuplicateObserved = true;
 
+    /** */
+    private boolean isPersistenceEnabled;
+
     /**
      *
      */
@@ -67,6 +73,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
         }
     }
 
+    /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
@@ -75,13 +82,17 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(discoSpi);
 
-        CacheConfiguration singleCacheConfig = new CacheConfiguration()
+        CacheConfiguration singleCacheCfg = new CacheConfiguration()
             .setName(DEFAULT_CACHE_NAME)
             .setCacheMode(CacheMode.PARTITIONED)
             .setBackups(1)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC);
 
-        cfg.setCacheConfiguration(singleCacheConfig);
+        cfg.setCacheConfiguration(singleCacheCfg);
+
+        //persistence must be enabled to verify restoring mappings from FS case
+        if (isPersistenceEnabled)
+            cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
 
         return cfg;
     }
@@ -110,11 +121,14 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
      * In that case the request must not be marked as duplicate and must be processed in a regular way.
      * No hangs must take place.
      *
-     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> Take a look at JIRA ticket for more information about context of this test.
+     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> JIRA ticket
+     * provides more information about context of this test.
      *
      * This test must never hang on proposing of MarshallerMapping.
      */
     public void testFileMappingReadAndPropose() throws Exception {
+        isPersistenceEnabled = false;
+
         prepareMarshallerFileStore();
 
         IgniteEx ignite0 = startGrid(0);
@@ -162,6 +176,57 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * Verifies scenario that node with corrupted marshaller mapping store must fail on startup
+     * with appropriate error message.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-6536">IGNITE-6536</a> JIRA provides more information
+     * about this case.
+     */
+    public void testNodeStartFailsOnCorruptedStorage() throws Exception {
+        isPersistenceEnabled = true;
+
+        Ignite ig = startGrids(3);
+
+        ig.active(true);
+
+        ig.cache(DEFAULT_CACHE_NAME).put(0, new SimpleValue(0, "value0"));
+
+        stopAllGrids();
+
+        corruptMarshallerStorage();
+
+        try {
+            startGrid(0);
+        }
+        catch (IgniteCheckedException e) {
+            verifyException((IgniteCheckedException) e.getCause());
+        }
+    }
+
+    /**
+     * Class name for CustomClass class mapping file gets cleaned up from file system.
+     */
+    private void corruptMarshallerStorage() throws Exception {
+        String marshallerDir = U.defaultWorkDirectory() + File.separator + "marshaller";
+
+        File[] storedMappingsFiles = new File(marshallerDir).listFiles();
+
+        assert storedMappingsFiles.length == 1;
+
+        try (FileOutputStream out = new FileOutputStream(storedMappingsFiles[0])) {
+            out.getChannel().truncate(0);
+        }
+    }
+
+    /** */
+    private void verifyException(IgniteCheckedException e) throws Exception {
+        String msg = e.getMessage();
+
+        if (msg == null || !msg.contains("Class name is null"))
+            throw new Exception("Exception with unexpected message was thrown: " + msg, e);
+    }
+
     /** */
     private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
 


[2/6] ignite git commit: IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Posted by sb...@apache.org.
IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2

Branch: refs/heads/ignite-3478
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300

----------------------------------------------------------------------
 .../nio/GridAbstractCommunicationClient.java    |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 378 ++++++++++---------
 2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
 
     /** {@inheritDoc} */
     @Override public void forceClose() {
-        closed.set(false);
+        closed.set(true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             }
 
             try {
-                safeHandshake(client,
-                    null,
-                    node.id(),
-                    timeoutHelper.nextTimeoutChunk(connTimeout0),
-                    null,
-                    null);
+                safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
 
-        boolean conn = false;
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;
 
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
             int lastWaitingTimeout = 1;
 
-            while (!conn) { // Reconnection on handshake timeout.
+            while (client == null) { // Reconnection on handshake timeout.
+                boolean needWait = false;
+
                 try {
                     SocketChannel ch = SocketChannel.open();
 
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         return null;
                     }
 
-                    Long rcvCnt = null;
+                    Long rcvCnt;
 
                     Map<Integer, Object> meta = new HashMap<>();
 
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                         Integer handshakeConnIdx = connIdx;
 
-                        rcvCnt = safeHandshake(ch,
+                        rcvCnt = safeTcpHandshake(ch,
                             recoveryDesc,
                             node.id(),
                             timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             handshakeConnIdx);
 
                         if (rcvCnt == ALREADY_CONNECTED) {
-                            recoveryDesc.release();
-
                             return null;
                         }
                         else if (rcvCnt == NODE_STOPPING) {
-                            recoveryDesc.release();
-
                             throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                         }
                         else if (rcvCnt == NEED_WAIT) {
-                            recoveryDesc.release();
-
-                            U.closeQuiet(ch);
-
-                            if (lastWaitingTimeout < 60000)
-                                lastWaitingTimeout *= 2;
-
-                            U.sleep(lastWaitingTimeout);
+                            needWait = true;
 
                             continue;
                         }
-                    }
-                    finally {
-                        if (recoveryDesc != null && rcvCnt == null)
-                            recoveryDesc.release();
-                    }
 
-                    try {
                         meta.put(CONN_IDX_META, connKey);
 
                         if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
 
                         client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
-                        conn = true;
                     }
                     finally {
-                        if (!conn) {
+                        if (client == null) {
+                            U.closeQuiet(ch);
+
                             if (recoveryDesc != null)
                                 recoveryDesc.release();
+
+                            if (needWait) {
+                                if (lastWaitingTimeout < 60000)
+                                    lastWaitingTimeout *= 2;
+
+                                U.sleep(lastWaitingTimeout);
+                            }
                         }
                     }
                 }
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
-            if (conn)
+            if (client != null)
                 break;
         }
 
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * Performs handshake in timeout-safe way.
      *
      * @param client Client.
+     * @param rmtNodeId Remote node.
+     * @param timeout Timeout for handshake.
+     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private void safeShmemHandshake(
+        GridCommunicationClient client,
+        UUID rmtNodeId,
+        long timeout
+    ) throws IgniteCheckedException {
+        HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+            U.currentTimeMillis() + timeout);
+
+        addTimeoutObject(obj);
+
+        try {
+            client.doHandshake(new HandshakeClosure(rmtNodeId));
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                removeTimeoutObject(obj);
+
+            // Ignoring whatever happened after timeout - reporting only timeout event.
+            if (!cancelled)
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+                        "(consider increasing 'connectionTimeout' configuration property)."));
+        }
+    }
+
+    /**
+     * Performs handshake in timeout-safe way.
+     *
+     * @param ch Socket channel.
      * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @return Handshake response.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> long safeHandshake(
-        T client,
+    private long safeTcpHandshake(
+        SocketChannel ch,
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
         GridSslMeta sslMeta,
         @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
-        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+        HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
         try {
-            if (client instanceof GridCommunicationClient)
-                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
-            else {
-                SocketChannel ch = (SocketChannel)client;
+            BlockingSslHandler sslHnd = null;
 
-                boolean success = false;
+            ByteBuffer buf;
 
-                try {
-                    BlockingSslHandler sslHnd = null;
+            if (isSslEnabled()) {
+                assert sslMeta != null;
 
-                    ByteBuffer buf;
+                sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
 
-                    if (isSslEnabled()) {
-                        assert sslMeta != null;
+                if (!sslHnd.handshake())
+                    throw new HandshakeException("SSL handshake is not completed.");
 
-                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+                ByteBuffer handBuff = sslHnd.applicationBuffer();
 
-                        if (!sslHnd.handshake())
-                            throw new HandshakeException("SSL handshake is not completed.");
+                if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+                    buf = ByteBuffer.allocate(1000);
 
-                        ByteBuffer handBuff = sslHnd.applicationBuffer();
+                    int read = ch.read(buf);
 
-                        if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
-                            buf = ByteBuffer.allocate(1000);
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            int read = ch.read(buf);
+                    buf.flip();
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
-                            buf.flip();
-
-                            buf = sslHnd.decode(buf);
-                        }
-                        else
-                            buf = handBuff;
-                    }
-                    else {
-                        buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+                    buf = sslHnd.decode(buf);
+                }
+                else
+                    buf = handBuff;
+            }
+            else {
+                buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
 
-                        for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
-                            int read = ch.read(buf);
+                for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+                    int read = ch.read(buf);
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            i += read;
-                        }
-                    }
+                    i += read;
+                }
+            }
 
-                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+            UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
-                    if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
-                            ", rcvd=" + rmtNodeId0 + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Received remote node ID: " + rmtNodeId0);
+            if (!rmtNodeId.equals(rmtNodeId0))
+                throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                    ", rcvd=" + rmtNodeId0 + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    if (isSslEnabled()) {
-                        assert sslHnd != null;
+            if (isSslEnabled()) {
+                assert sslHnd != null;
 
-                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
-                    }
-                    else
-                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+            }
+            else
+                ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
-                    ClusterNode locNode = getLocalNode();
+            ClusterNode locNode = getLocalNode();
 
-                    if (locNode == null)
-                        throw new IgniteCheckedException("Local node has not been started or " +
-                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+            if (locNode == null)
+                throw new IgniteCheckedException("Local node has not been started or " +
+                    "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
-                    if (recovery != null) {
-                        HandshakeMessage msg;
+            if (recovery != null) {
+                HandshakeMessage msg;
 
-                        int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+                int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
 
-                        if (handshakeConnIdx != null) {
-                            msg = new HandshakeMessage2(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received(),
-                                handshakeConnIdx);
+                if (handshakeConnIdx != null) {
+                    msg = new HandshakeMessage2(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received(),
+                        handshakeConnIdx);
 
-                            msgSize += 4;
-                        }
-                        else {
-                            msg = new HandshakeMessage(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received());
-                        }
+                    msgSize += 4;
+                }
+                else {
+                    msg = new HandshakeMessage(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received());
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Writing handshake message [locNodeId=" + locNode.id() +
-                                ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+                        ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(msgSize);
+                buf = ByteBuffer.allocate(msgSize);
 
-                        buf.order(ByteOrder.nativeOrder());
+                buf.order(ByteOrder.nativeOrder());
 
-                        boolean written = msg.writeTo(buf, null);
+                boolean written = msg.writeTo(buf, null);
 
-                        assert written;
+                assert written;
 
-                        buf.flip();
+                buf.flip();
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(buf));
-                        }
-                        else
-                            ch.write(buf);
-                    }
-                    else {
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                    ch.write(sslHnd.encrypt(buf));
+                }
+                else
+                    ch.write(buf);
+            }
+            else {
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
-                        }
-                        else
-                            ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
-                    }
+                    ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+                }
+                else
+                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+            }
 
-                    if (recovery != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            if (recovery != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            buf = ByteBuffer.allocate(1000);
-                            buf.order(ByteOrder.nativeOrder());
+                    buf = ByteBuffer.allocate(1000);
+                    buf.order(ByteOrder.nativeOrder());
 
-                            ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
-                            decode.order(ByteOrder.nativeOrder());
+                    ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+                    decode.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                buf.flip();
+                        buf.flip();
 
-                                ByteBuffer decode0 = sslHnd.decode(buf);
+                        ByteBuffer decode0 = sslHnd.decode(buf);
 
-                                i += decode0.remaining();
+                        i += decode0.remaining();
 
-                                decode = appendAndResizeIfNeeded(decode, decode0);
+                        decode = appendAndResizeIfNeeded(decode, decode0);
 
-                                buf.clear();
-                            }
+                        buf.clear();
+                    }
 
-                            decode.flip();
+                    decode.flip();
 
-                            rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+                    rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
 
-                            if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
-                                decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+                        decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                                sslMeta.decodedBuffer(decode);
-                            }
+                        sslMeta.decodedBuffer(decode);
+                    }
 
-                            ByteBuffer inBuf = sslHnd.inputBuffer();
+                    ByteBuffer inBuf = sslHnd.inputBuffer();
 
-                            if (inBuf.position() > 0)
-                                sslMeta.encodedBuffer(inBuf);
-                        }
-                        else {
-                            buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (inBuf.position() > 0)
+                        sslMeta.encodedBuffer(inBuf);
+                }
+                else {
+                    buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                            buf.order(ByteOrder.nativeOrder());
+                    buf.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                i += read;
-                            }
+                        i += read;
+                    }
 
-                            rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
-                        }
+                    rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
-                        if (rcvCnt == -1) {
-                            if (log.isDebugEnabled())
-                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                        }
-                        else
-                            success = true;
-                    }
-                    else
-                        success = true;
-                }
-                catch (IOException e) {
+                if (rcvCnt == -1) {
                     if (log.isDebugEnabled())
-                        log.debug("Failed to read from channel: " + e);
-
-                    throw new IgniteCheckedException("Failed to read from channel.", e);
-                }
-                finally {
-                    if (!success)
-                        U.closeQuiet(ch);
+                        log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
                 }
             }
         }
+        catch (IOException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to read from channel: " + e);
+
+            throw new IgniteCheckedException("Failed to read from channel.", e);
+        }
         finally {
             boolean cancelled = obj.cancel();
 


[4/6] ignite git commit: IGNITE-5928 .NET: Get rid of BinaryStreamBase

Posted by sb...@apache.org.
IGNITE-5928 .NET: Get rid of BinaryStreamBase

This closes #2835


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b7c508
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b7c508
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b7c508

Branch: refs/heads/ignite-3478
Commit: b8b7c50865d37449616cc6dadeeaba84526945f4
Parents: 5490c7d
Author: Alexey Popov <ta...@gmail.com>
Authored: Thu Oct 12 12:44:00 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Oct 12 12:44:00 2017 +0300

----------------------------------------------------------------------
 .../Apache.Ignite.Core.csproj                   |    1 -
 .../Impl/Binary/Io/BinaryHeapStream.cs          | 1018 +++++++++++++-
 .../Impl/Binary/Io/BinaryStreamBase.cs          | 1249 ------------------
 3 files changed, 958 insertions(+), 1310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 58abd26..446208a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -431,7 +431,6 @@
     <Compile Include="Impl\Messaging\Messaging.cs" />
     <Compile Include="Impl\NativeMethods.cs" />
     <Compile Include="Impl\Binary\IO\IBinaryStream.cs" />
-    <Compile Include="Impl\Binary\IO\BinaryStreamBase.cs" />
     <Compile Include="Impl\Binary\IO\BinaryHeapStream.cs" />
     <Compile Include="Impl\Binary\IBinaryTypeDescriptor.cs" />
     <Compile Include="Impl\Binary\IBinaryWriteAware.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a14da0a..a6082f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -22,12 +22,28 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
     using System.Text;
+    using Apache.Ignite.Core.Impl.Memory;
 
     /// <summary>
     /// Binary onheap stream.
     /// </summary>
-    internal unsafe class BinaryHeapStream : BinaryStreamBase
+    internal unsafe class BinaryHeapStream : IBinaryStream
     {
+        /** Byte: zero. */
+        private const byte ByteZero = 0;
+
+        /** Byte: one. */
+        private const byte ByteOne = 1;
+
+        /** LITTLE_ENDIAN flag. */
+        private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
+
+        /** Position. */
+        private int _pos;
+
+        /** Disposed flag. */
+        private bool _disposed;
+
         /** Data array. */
         private byte[] _data;
 
@@ -53,8 +69,903 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             _data = data;
         }
 
+        /// <summary>
+        /// Internal routine to write byte array.
+        /// </summary>
+        /// <param name="val">Byte array.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteByteArray0(byte[] val, byte* data)
+        {
+            fixed (byte* val0 = val)
+            {
+                CopyMemory(val0, data, val.Length);
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read byte array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Byte array</returns>
+        private static byte[] ReadByteArray0(int len, byte* data)
+        {
+            byte[] res = new byte[len];
+
+            fixed (byte* res0 = res)
+            {
+                CopyMemory(data, res0, len);
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteBool(bool val)
+        {
+            WriteByte(val ? ByteOne : ByteZero);
+        }
+
+        /** <inheritdoc /> */
+        public bool ReadBool()
+        {
+            return ReadByte() == ByteOne;
+        }
+
+        /// <summary>
+        /// Internal routine to write bool array.
+        /// </summary>
+        /// <param name="val">Bool array.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteBoolArray0(bool[] val, byte* data)
+        {
+            fixed (bool* val0 = val)
+            {
+                CopyMemory((byte*)val0, data, val.Length);
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read bool array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Bool array</returns>
+        private static bool[] ReadBoolArray0(int len, byte* data)
+        {
+            bool[] res = new bool[len];
+
+            fixed (bool* res0 = res)
+            {
+                CopyMemory(data, (byte*)res0, len);
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write short value.
+        /// </summary>
+        /// <param name="val">Short value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteShort0(short val, byte* data)
+        {
+            if (LittleEndian)
+                *((short*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[1];
+                data[1] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read short value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Short value</returns>
+        private static short ReadShort0(byte* data)
+        {
+            short val;
+
+            if (LittleEndian)
+                val = *((short*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[1];
+                valPtr[1] = data[0];
+            }
+
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write short array.
+        /// </summary>
+        /// <param name="val">Short array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteShortArray0(short[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (short* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    short val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+                    
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read short array.
+        /// </summary>
+        /// <param name="len">Array length.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Short array</returns>
+        private static short[] ReadShortArray0(int len, byte* data, int cnt)
+        {
+            short[] res = new short[len];
+
+            if (LittleEndian)
+            {
+                fixed (short* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    short val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteChar(char val)
+        {
+            WriteShort(*(short*)(&val));
+        }
+
+        /** <inheritdoc /> */
+        public char ReadChar()
+        {
+            short val = ReadShort();
+
+            return *(char*)(&val);
+        }
+
+        /// <summary>
+        /// Internal routine to write char array.
+        /// </summary>
+        /// <param name="val">Char array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteCharArray0(char[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (char* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    char val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read char array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Char array</returns>
+        private static char[] ReadCharArray0(int len, byte* data, int cnt)
+        {
+            char[] res = new char[len];
+
+            if (LittleEndian)
+            {
+                fixed (char* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    char val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write int value.
+        /// </summary>
+        /// <param name="val">Int value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteInt0(int val, byte* data)
+        {
+            if (LittleEndian)
+                *((int*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[3];
+                data[1] = valPtr[2];
+                data[2] = valPtr[1];
+                data[3] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read int value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Int value</returns>
+        private static int ReadInt0(byte* data) {
+            int val;
+
+            if (LittleEndian)
+                val = *((int*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[3];
+                valPtr[1] = data[2];
+                valPtr[2] = data[1];
+                valPtr[3] = data[0];
+            }
+            
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write int array.
+        /// </summary>
+        /// <param name="val">Int array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteIntArray0(int[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (int* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    int val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read int array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Int array</returns>
+        private static int[] ReadIntArray0(int len, byte* data, int cnt)
+        {
+            int[] res = new int[len];
+
+            if (LittleEndian)
+            {
+                fixed (int* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    int val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteFloat(float val)
+        {
+            int val0 = *(int*)(&val);
+
+            WriteInt(val0);
+        }
+
+        /** <inheritdoc /> */
+        public float ReadFloat()
+        {
+            int val = ReadInt();
+
+            return BinaryUtils.IntToFloatBits(val);
+        }
+
+        /// <summary>
+        /// Internal routine to write float array.
+        /// </summary>
+        /// <param name="val">Int array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteFloatArray0(float[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (float* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    float val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read float array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Float array</returns>
+        private static float[] ReadFloatArray0(int len, byte* data, int cnt)
+        {
+            float[] res = new float[len];
+
+            if (LittleEndian)
+            {
+                fixed (float* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    int val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Internal routine to write long value.
+        /// </summary>
+        /// <param name="val">Long value.</param>
+        /// <param name="data">Data pointer.</param>
+        private static void WriteLong0(long val, byte* data)
+        {
+            if (LittleEndian)
+                *((long*)data) = val;
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                data[0] = valPtr[7];
+                data[1] = valPtr[6];
+                data[2] = valPtr[5];
+                data[3] = valPtr[4];
+                data[4] = valPtr[3];
+                data[5] = valPtr[2];
+                data[6] = valPtr[1];
+                data[7] = valPtr[0];
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read long value.
+        /// </summary>
+        /// <param name="data">Data pointer.</param>
+        /// <returns>Long value</returns>
+        private static long ReadLong0(byte* data)
+        {
+            long val;
+
+            if (LittleEndian)
+                val = *((long*)data);
+            else
+            {
+                byte* valPtr = (byte*)&val;
+
+                valPtr[0] = data[7];
+                valPtr[1] = data[6];
+                valPtr[2] = data[5];
+                valPtr[3] = data[4];
+                valPtr[4] = data[3];
+                valPtr[5] = data[2];
+                valPtr[6] = data[1];
+                valPtr[7] = data[0];
+            }
+
+            return val;
+        }
+
+        /// <summary>
+        /// Internal routine to write long array.
+        /// </summary>
+        /// <param name="val">Long array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteLongArray0(long[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (long* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    long val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[7];
+                    *curPos++ = valPtr[6];
+                    *curPos++ = valPtr[5];
+                    *curPos++ = valPtr[4];
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read long array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Long array</returns>
+        private static long[] ReadLongArray0(int len, byte* data, int cnt)
+        {
+            long[] res = new long[len];
+
+            if (LittleEndian)
+            {
+                fixed (long* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    long val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[7] = *data++;
+                    valPtr[6] = *data++;
+                    valPtr[5] = *data++;
+                    valPtr[4] = *data++;
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void WriteDouble(double val)
+        {
+            long val0 = *(long*)(&val);
+
+            WriteLong(val0);
+        }
+
+        /** <inheritdoc /> */
+        public double ReadDouble()
+        {
+            long val = ReadLong();
+
+            return BinaryUtils.LongToDoubleBits(val);
+        }
+
+        /// <summary>
+        /// Internal routine to write double array.
+        /// </summary>
+        /// <param name="val">Double array.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        private static void WriteDoubleArray0(double[] val, byte* data, int cnt)
+        {
+            if (LittleEndian)
+            {
+                fixed (double* val0 = val)
+                {
+                    CopyMemory((byte*)val0, data, cnt);
+                }
+            }
+            else
+            {
+                byte* curPos = data;
+
+                for (int i = 0; i < val.Length; i++)
+                {
+                    double val0 = val[i];
+
+                    byte* valPtr = (byte*)&(val0);
+
+                    *curPos++ = valPtr[7];
+                    *curPos++ = valPtr[6];
+                    *curPos++ = valPtr[5];
+                    *curPos++ = valPtr[4];
+                    *curPos++ = valPtr[3];
+                    *curPos++ = valPtr[2];
+                    *curPos++ = valPtr[1];
+                    *curPos++ = valPtr[0];
+                }
+            }
+        }
+
+        /// <summary>
+        /// Internal routine to read double array.
+        /// </summary>
+        /// <param name="len">Count.</param>
+        /// <param name="data">Data pointer.</param>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Double array</returns>
+        private static double[] ReadDoubleArray0(int len, byte* data, int cnt)
+        {
+            double[] res = new double[len];
+
+            if (LittleEndian)
+            {
+                fixed (double* res0 = res)
+                {
+                    CopyMemory(data, (byte*)res0, cnt);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < len; i++)
+                {
+                    double val;
+
+                    byte* valPtr = (byte*)&val;
+
+                    valPtr[7] = *data++;
+                    valPtr[6] = *data++;
+                    valPtr[5] = *data++;
+                    valPtr[4] = *data++;
+                    valPtr[3] = *data++;
+                    valPtr[2] = *data++;
+                    valPtr[1] = *data++;
+                    valPtr[0] = *data++;
+
+                    res[i] = val;
+                }
+            }
+
+            return res;
+        }
+
+        /** <inheritdoc /> */
+        public void Write(byte[] src, int off, int cnt)
+        {
+            fixed (byte* src0 = src)
+            {
+                Write(src0 + off, cnt);
+            }
+        }
+
+        /** <inheritdoc /> */
+        public void Read(byte[] dest, int off, int cnt)
+        {
+            fixed (byte* dest0 = dest)
+            {
+                Read(dest0 + off, cnt);
+            }
+        }
+
+        /// <summary>
+        /// Internal write routine.
+        /// </summary>
+        /// <param name="src">Source.</param>
+        /// <param name="cnt">Count.</param>
+        /// <param name="data">Data (dsetination).</param>
+        private void WriteInternal(byte* src, int cnt, byte* data)
+        {
+            CopyMemory(src, data + _pos, cnt);
+        }
+
+        /// <summary>
+        /// Internal read routine.
+        /// </summary>
+        /// <param name="src">Source</param>
+        /// <param name="dest">Destination.</param>
+        /// <param name="cnt">Count.</param>
+        /// <returns>Amount of bytes written.</returns>
+        private void ReadInternal(byte* src, byte* dest, int cnt)
+        {
+            int cnt0 = Math.Min(Remaining, cnt);
+
+            CopyMemory(src + _pos, dest, cnt0);
+
+            ShiftRead(cnt0);
+        }
+
+        /** <inheritdoc /> */
+        public int Position
+        {
+            get { return _pos; }
+        }
+
         /** <inheritdoc /> */
-        public override void WriteByte(byte val)
+        public int Remaining
+        {
+            get { return _data.Length - _pos; }
+        }
+
+        /// <summary>
+        /// Internal array.
+        /// </summary>
+        internal byte[] InternalArray
+        {
+            get { return _data; }
+        }
+
+        /// <inheritdoc />
+        /// <exception cref="T:System.ArgumentException">
+        /// Unsupported seek origin:  + origin
+        /// or
+        /// Seek before origin:  + newPos
+        /// </exception>
+        public int Seek(int offset, SeekOrigin origin)
+        {
+            int newPos;
+
+            switch (origin)
+            {
+                case SeekOrigin.Begin:
+                    {
+                        newPos = offset;
+
+                        break;
+                    }
+
+                case SeekOrigin.Current:
+                    {
+                        newPos = _pos + offset;
+
+                        break;
+                    }
+
+                default:
+                    throw new ArgumentException("Unsupported seek origin: " + origin);
+            }
+
+            if (newPos < 0)
+                throw new ArgumentException("Seek before origin: " + newPos);
+
+            EnsureWriteCapacity(newPos);
+
+            _pos = newPos;
+
+            return _pos;
+        }
+
+        /** <inheritdoc /> */
+        public void Flush()
+        {
+            // No-op.
+        }
+
+        /** <inheritdoc /> */
+        public void Dispose()
+        {
+            if (_disposed)
+                return;
+
+            GC.SuppressFinalize(this);
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Ensure capacity for write and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private int EnsureWriteCapacityAndShift(int cnt)
+        {
+            int pos0 = _pos;
+
+            EnsureWriteCapacity(_pos + cnt);
+
+            ShiftWrite(cnt);
+
+            return pos0;
+        }
+
+        /// <summary>
+        /// Ensure capacity for read and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private int EnsureReadCapacityAndShift(int cnt)
+        {
+            int pos0 = _pos;
+
+            EnsureReadCapacity(cnt);
+
+            ShiftRead(cnt);
+
+            return pos0;
+        }
+
+        /// <summary>
+        /// Shift position due to write
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        private void ShiftWrite(int cnt)
+        {
+            _pos += cnt;
+        }
+
+        /// <summary>
+        /// Shift position due to read.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        private void ShiftRead(int cnt)
+        {
+            _pos += cnt;
+        }
+
+        /// <summary>
+        /// Calculate new capacity.
+        /// </summary>
+        /// <param name="curCap">Current capacity.</param>
+        /// <param name="reqCap">Required capacity.</param>
+        /// <returns>New capacity.</returns>
+        private static int Capacity(int curCap, int reqCap)
+        {
+            int newCap;
+
+            if (reqCap < 256)
+                newCap = 256;
+            else
+            {
+                newCap = curCap << 1;
+
+                if (newCap < reqCap)
+                    newCap = reqCap;
+            }
+
+            return newCap;
+        }
+
+        /// <summary>
+        /// Unsafe memory copy routine.
+        /// </summary>
+        /// <param name="src">Source.</param>
+        /// <param name="dest">Destination.</param>
+        /// <param name="len">Length.</param>
+        private static void CopyMemory(byte* src, byte* dest, int len)
+        {
+            PlatformMemoryUtils.CopyMemory(src, dest, len);
+        }
+
+        /** <inheritdoc /> */
+        public void WriteByte(byte val)
         {
             int pos0 = EnsureWriteCapacityAndShift(1);
 
@@ -62,7 +973,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override byte ReadByte()
+        public byte ReadByte()
         {
             int pos0 = EnsureReadCapacityAndShift(1);
 
@@ -71,7 +982,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteByteArray(byte[] val)
+        public void WriteByteArray(byte[] val)
         {
             int pos0 = EnsureWriteCapacityAndShift(val.Length);
 
@@ -82,7 +993,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override byte[] ReadByteArray(int cnt)
+        public byte[] ReadByteArray(int cnt)
         {
             int pos0 = EnsureReadCapacityAndShift(cnt);
 
@@ -94,7 +1005,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteBoolArray(bool[] val)
+        public void WriteBoolArray(bool[] val)
         {
             int pos0 = EnsureWriteCapacityAndShift(val.Length);
 
@@ -105,7 +1016,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override bool[] ReadBoolArray(int cnt)
+        public bool[] ReadBoolArray(int cnt)
         {
             int pos0 = EnsureReadCapacityAndShift(cnt);
 
@@ -116,7 +1027,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteShort(short val)
+        public void WriteShort(short val)
         {
             int pos0 = EnsureWriteCapacityAndShift(2);
 
@@ -127,7 +1038,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override short ReadShort()
+        public short ReadShort()
         {
             int pos0 = EnsureReadCapacityAndShift(2);
 
@@ -139,7 +1050,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteShortArray(short[] val)
+        public void WriteShortArray(short[] val)
         {
             int cnt = val.Length << 1;
 
@@ -152,7 +1063,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override short[] ReadShortArray(int cnt)
+        public short[] ReadShortArray(int cnt)
         {
             int cnt0 = cnt << 1;
 
@@ -166,7 +1077,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteCharArray(char[] val)
+        public void WriteCharArray(char[] val)
         {
             int cnt = val.Length << 1;
 
@@ -179,7 +1090,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override char[] ReadCharArray(int cnt)
+        public char[] ReadCharArray(int cnt)
         {
             int cnt0 = cnt << 1;
 
@@ -192,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteInt(int val)
+        public void WriteInt(int val)
         {
             int pos0 = EnsureWriteCapacityAndShift(4);
 
@@ -203,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteInt(int writePos, int val)
+        public void WriteInt(int writePos, int val)
         {
             EnsureWriteCapacity(writePos + 4);
 
@@ -214,7 +1125,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int ReadInt()
+        public int ReadInt()
         {
             int pos0 = EnsureReadCapacityAndShift(4);
 
@@ -226,7 +1137,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteIntArray(int[] val)
+        public void WriteIntArray(int[] val)
         {
             int cnt = val.Length << 2;
 
@@ -239,7 +1150,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int[] ReadIntArray(int cnt)
+        public int[] ReadIntArray(int cnt)
         {
             int cnt0 = cnt << 2;
 
@@ -253,7 +1164,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteFloatArray(float[] val)
+        public void WriteFloatArray(float[] val)
         {
             int cnt = val.Length << 2;
 
@@ -266,7 +1177,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override float[] ReadFloatArray(int cnt)
+        public float[] ReadFloatArray(int cnt)
         {
             int cnt0 = cnt << 2;
 
@@ -279,7 +1190,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void WriteLong(long val)
+        public void WriteLong(long val)
         {
             int pos0 = EnsureWriteCapacityAndShift(8);
 
@@ -290,7 +1201,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override long ReadLong()
+        public long ReadLong()
         {
             int pos0 = EnsureReadCapacityAndShift(8);
 
@@ -302,7 +1213,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteLongArray(long[] val)
+        public void WriteLongArray(long[] val)
         {
             int cnt = val.Length << 3;
 
@@ -315,7 +1226,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override long[] ReadLongArray(int cnt)
+        public long[] ReadLongArray(int cnt)
         {
             int cnt0 = cnt << 3;
 
@@ -329,7 +1240,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override void WriteDoubleArray(double[] val)
+        public void WriteDoubleArray(double[] val)
         {
             int cnt = val.Length << 3;
 
@@ -342,7 +1253,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override double[] ReadDoubleArray(int cnt)
+        public double[] ReadDoubleArray(int cnt)
         {
             int cnt0 = cnt << 3;
 
@@ -355,7 +1266,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
+        public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
         {
             int pos0 = EnsureWriteCapacityAndShift(byteCnt);
 
@@ -370,9 +1281,9 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void Write(byte* src, int cnt)
+        public void Write(byte* src, int cnt)
         {
-            EnsureWriteCapacity(Pos + cnt);
+            EnsureWriteCapacity(_pos + cnt);
 
             fixed (byte* data0 = _data)
             {
@@ -383,7 +1294,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override void Read(byte* dest, int cnt)
+        public void Read(byte* dest, int cnt)
         {
             fixed (byte* data0 = _data)
             {
@@ -392,36 +1303,30 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
         }
 
         /** <inheritdoc /> */
-        public override int Remaining
-        {
-            get { return _data.Length - Pos; }
-        }
-
-        /** <inheritdoc /> */
-        public override byte[] GetArray()
+        public byte[] GetArray()
         {
             return _data;
         }
 
         /** <inheritdoc /> */
-        public override byte[] GetArrayCopy()
+        public byte[] GetArrayCopy()
         {
-            byte[] copy = new byte[Pos];
+            byte[] copy = new byte[_pos];
 
-            Buffer.BlockCopy(_data, 0, copy, 0, Pos);
+            Buffer.BlockCopy(_data, 0, copy, 0, _pos);
 
             return copy;
         }
 
         /** <inheritdoc /> */
-        public override bool IsSameArray(byte[] arr)
+        public bool IsSameArray(byte[] arr)
         {
             return _data == arr;
         }
 
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
-        public override T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
+        public T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
         {
             Debug.Assert(proc != null);
 
@@ -431,22 +1336,11 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             }
         }
 
-        /** <inheritdoc /> */
-        protected override void Dispose(bool disposing)
-        {
-            // No-op.
-        }
-
         /// <summary>
-        /// Internal array.
+        /// Ensure capacity for write.
         /// </summary>
-        internal byte[] InternalArray
-        {
-            get { return _data; }
-        }
-
-        /** <inheritdoc /> */
-        protected override void EnsureWriteCapacity(int cnt)
+        /// <param name="cnt">Bytes count.</param>
+        private void EnsureWriteCapacity(int cnt)
         {
             if (cnt > _data.Length)
             {
@@ -462,12 +1356,16 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
             }
         }
 
-        /** <inheritdoc /> */
-        protected override void EnsureReadCapacity(int cnt)
+        /// <summary>
+        /// Ensure capacity for write and shift position.
+        /// </summary>
+        /// <param name="cnt">Bytes count.</param>
+        /// <returns>Position before shift.</returns>
+        private void EnsureReadCapacity(int cnt)
         {
-            if (_data.Length - Pos < cnt)
+            if (_data.Length - _pos < cnt)
                 throw new EndOfStreamException("Not enough data in stream [expected=" + cnt +
-                    ", remaining=" + (_data.Length - Pos) + ']');
+                                               ", remaining=" + (_data.Length - _pos) + ']');
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
deleted file mode 100644
index 0b855f8..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
+++ /dev/null
@@ -1,1249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary.IO
-{
-    using System;
-    using System.IO;
-    using System.Text;
-    using Apache.Ignite.Core.Impl.Memory;
-
-    /// <summary>
-    /// Base class for managed and unmanaged data streams.
-    /// </summary>
-    internal abstract unsafe class BinaryStreamBase : IBinaryStream
-    {
-        /** Byte: zero. */
-        private const byte ByteZero = 0;
-
-        /** Byte: one. */
-        private const byte ByteOne = 1;
-
-        /** LITTLE_ENDIAN flag. */
-        private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
-
-        /** Position. */
-        protected int Pos;
-
-        /** Disposed flag. */
-        private bool _disposed;
-
-        /// <summary>
-        /// Write byte.
-        /// </summary>
-        /// <param name="val">Byte value.</param>
-        public abstract void WriteByte(byte val);
-
-        /// <summary>
-        /// Read byte.
-        /// </summary>
-        /// <returns>
-        /// Byte value.
-        /// </returns>
-        public abstract byte ReadByte();
-
-        /// <summary>
-        /// Write byte array.
-        /// </summary>
-        /// <param name="val">Byte array.</param>
-        public abstract void WriteByteArray(byte[] val);
-
-        /// <summary>
-        /// Internal routine to write byte array.
-        /// </summary>
-        /// <param name="val">Byte array.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteByteArray0(byte[] val, byte* data)
-        {
-            fixed (byte* val0 = val)
-            {
-                CopyMemory(val0, data, val.Length);
-            }
-        }
-
-        /// <summary>
-        /// Read byte array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Byte array.
-        /// </returns>
-        public abstract byte[] ReadByteArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read byte array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Byte array</returns>
-        protected static byte[] ReadByteArray0(int len, byte* data)
-        {
-            byte[] res = new byte[len];
-
-            fixed (byte* res0 = res)
-            {
-                CopyMemory(data, res0, len);
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write bool.
-        /// </summary>
-        /// <param name="val">Bool value.</param>
-        public void WriteBool(bool val)
-        {
-            WriteByte(val ? ByteOne : ByteZero);
-        }
-
-        /// <summary>
-        /// Read bool.
-        /// </summary>
-        /// <returns>
-        /// Bool value.
-        /// </returns>
-        public bool ReadBool()
-        {
-            return ReadByte() == ByteOne;
-        }
-
-        /// <summary>
-        /// Write bool array.
-        /// </summary>
-        /// <param name="val">Bool array.</param>
-        public abstract void WriteBoolArray(bool[] val);
-
-        /// <summary>
-        /// Internal routine to write bool array.
-        /// </summary>
-        /// <param name="val">Bool array.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteBoolArray0(bool[] val, byte* data)
-        {
-            fixed (bool* val0 = val)
-            {
-                CopyMemory((byte*)val0, data, val.Length);
-            }
-        }
-
-        /// <summary>
-        /// Read bool array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Bool array.
-        /// </returns>
-        public abstract bool[] ReadBoolArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read bool array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Bool array</returns>
-        protected static bool[] ReadBoolArray0(int len, byte* data)
-        {
-            bool[] res = new bool[len];
-
-            fixed (bool* res0 = res)
-            {
-                CopyMemory(data, (byte*)res0, len);
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write short.
-        /// </summary>
-        /// <param name="val">Short value.</param>
-        public abstract void WriteShort(short val);
-
-        /// <summary>
-        /// Internal routine to write short value.
-        /// </summary>
-        /// <param name="val">Short value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteShort0(short val, byte* data)
-        {
-            if (LittleEndian)
-                *((short*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[1];
-                data[1] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read short.
-        /// </summary>
-        /// <returns>
-        /// Short value.
-        /// </returns>
-        public abstract short ReadShort();
-
-        /// <summary>
-        /// Internal routine to read short value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Short value</returns>
-        protected static short ReadShort0(byte* data)
-        {
-            short val;
-
-            if (LittleEndian)
-                val = *((short*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[1];
-                valPtr[1] = data[0];
-            }
-
-            return val;
-        }
-
-        /// <summary>
-        /// Write short array.
-        /// </summary>
-        /// <param name="val">Short array.</param>
-        public abstract void WriteShortArray(short[] val);
-
-        /// <summary>
-        /// Internal routine to write short array.
-        /// </summary>
-        /// <param name="val">Short array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteShortArray0(short[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (short* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    short val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-                    
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read short array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Short array.
-        /// </returns>
-        public abstract short[] ReadShortArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read short array.
-        /// </summary>
-        /// <param name="len">Array length.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Short array</returns>
-        protected static short[] ReadShortArray0(int len, byte* data, int cnt)
-        {
-            short[] res = new short[len];
-
-            if (LittleEndian)
-            {
-                fixed (short* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    short val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write char.
-        /// </summary>
-        /// <param name="val">Char value.</param>
-        public void WriteChar(char val)
-        {
-            WriteShort(*(short*)(&val));
-        }
-
-        /// <summary>
-        /// Read char.
-        /// </summary>
-        /// <returns>
-        /// Char value.
-        /// </returns>
-        public char ReadChar()
-        {
-            short val = ReadShort();
-
-            return *(char*)(&val);
-        }
-
-        /// <summary>
-        /// Write char array.
-        /// </summary>
-        /// <param name="val">Char array.</param>
-        public abstract void WriteCharArray(char[] val);
-
-        /// <summary>
-        /// Internal routine to write char array.
-        /// </summary>
-        /// <param name="val">Char array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteCharArray0(char[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (char* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    char val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read char array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Char array.
-        /// </returns>
-        public abstract char[] ReadCharArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read char array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Char array</returns>
-        protected static char[] ReadCharArray0(int len, byte* data, int cnt)
-        {
-            char[] res = new char[len];
-
-            if (LittleEndian)
-            {
-                fixed (char* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    char val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write int.
-        /// </summary>
-        /// <param name="val">Int value.</param>
-        public abstract void WriteInt(int val);
-
-        /// <summary>
-        /// Write int to specific position.
-        /// </summary>
-        /// <param name="writePos">Position.</param>
-        /// <param name="val">Value.</param>
-        public abstract void WriteInt(int writePos, int val);
-
-        /// <summary>
-        /// Internal routine to write int value.
-        /// </summary>
-        /// <param name="val">Int value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteInt0(int val, byte* data)
-        {
-            if (LittleEndian)
-                *((int*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[3];
-                data[1] = valPtr[2];
-                data[2] = valPtr[1];
-                data[3] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read int.
-        /// </summary>
-        /// <returns>
-        /// Int value.
-        /// </returns>
-        public abstract int ReadInt();
-
-        /// <summary>
-        /// Internal routine to read int value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Int value</returns>
-        protected static int ReadInt0(byte* data) {
-            int val;
-
-            if (LittleEndian)
-                val = *((int*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[3];
-                valPtr[1] = data[2];
-                valPtr[2] = data[1];
-                valPtr[3] = data[0];
-            }
-            
-            return val;
-        }
-
-        /// <summary>
-        /// Write int array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        public abstract void WriteIntArray(int[] val);
-
-        /// <summary>
-        /// Internal routine to write int array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteIntArray0(int[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (int* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    int val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read int array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Int array.
-        /// </returns>
-        public abstract int[] ReadIntArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read int array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Int array</returns>
-        protected static int[] ReadIntArray0(int len, byte* data, int cnt)
-        {
-            int[] res = new int[len];
-
-            if (LittleEndian)
-            {
-                fixed (int* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    int val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write float.
-        /// </summary>
-        /// <param name="val">Float value.</param>
-        public void WriteFloat(float val)
-        {
-            int val0 = *(int*)(&val);
-
-            WriteInt(val0);
-        }
-
-        /// <summary>
-        /// Read float.
-        /// </summary>
-        /// <returns>
-        /// Float value.
-        /// </returns>
-        public float ReadFloat()
-        {
-            int val = ReadInt();
-
-            return BinaryUtils.IntToFloatBits(val);
-        }
-
-        /// <summary>
-        /// Write float array.
-        /// </summary>
-        /// <param name="val">Float array.</param>
-        public abstract void WriteFloatArray(float[] val);
-
-        /// <summary>
-        /// Internal routine to write float array.
-        /// </summary>
-        /// <param name="val">Int array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteFloatArray0(float[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (float* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    float val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read float array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Float array.
-        /// </returns>
-        public abstract float[] ReadFloatArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read float array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Float array</returns>
-        protected static float[] ReadFloatArray0(int len, byte* data, int cnt)
-        {
-            float[] res = new float[len];
-
-            if (LittleEndian)
-            {
-                fixed (float* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    int val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write long.
-        /// </summary>
-        /// <param name="val">Long value.</param>
-        public abstract void WriteLong(long val);
-
-        /// <summary>
-        /// Internal routine to write long value.
-        /// </summary>
-        /// <param name="val">Long value.</param>
-        /// <param name="data">Data pointer.</param>
-        protected static void WriteLong0(long val, byte* data)
-        {
-            if (LittleEndian)
-                *((long*)data) = val;
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                data[0] = valPtr[7];
-                data[1] = valPtr[6];
-                data[2] = valPtr[5];
-                data[3] = valPtr[4];
-                data[4] = valPtr[3];
-                data[5] = valPtr[2];
-                data[6] = valPtr[1];
-                data[7] = valPtr[0];
-            }
-        }
-
-        /// <summary>
-        /// Read long.
-        /// </summary>
-        /// <returns>
-        /// Long value.
-        /// </returns>
-        public abstract long ReadLong();
-
-        /// <summary>
-        /// Internal routine to read long value.
-        /// </summary>
-        /// <param name="data">Data pointer.</param>
-        /// <returns>Long value</returns>
-        protected static long ReadLong0(byte* data)
-        {
-            long val;
-
-            if (LittleEndian)
-                val = *((long*)data);
-            else
-            {
-                byte* valPtr = (byte*)&val;
-
-                valPtr[0] = data[7];
-                valPtr[1] = data[6];
-                valPtr[2] = data[5];
-                valPtr[3] = data[4];
-                valPtr[4] = data[3];
-                valPtr[5] = data[2];
-                valPtr[6] = data[1];
-                valPtr[7] = data[0];
-            }
-
-            return val;
-        }
-
-        /// <summary>
-        /// Write long array.
-        /// </summary>
-        /// <param name="val">Long array.</param>
-        public abstract void WriteLongArray(long[] val);
-
-        /// <summary>
-        /// Internal routine to write long array.
-        /// </summary>
-        /// <param name="val">Long array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteLongArray0(long[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (long* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    long val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[7];
-                    *curPos++ = valPtr[6];
-                    *curPos++ = valPtr[5];
-                    *curPos++ = valPtr[4];
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read long array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Long array.
-        /// </returns>
-        public abstract long[] ReadLongArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read long array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Long array</returns>
-        protected static long[] ReadLongArray0(int len, byte* data, int cnt)
-        {
-            long[] res = new long[len];
-
-            if (LittleEndian)
-            {
-                fixed (long* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    long val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[7] = *data++;
-                    valPtr[6] = *data++;
-                    valPtr[5] = *data++;
-                    valPtr[4] = *data++;
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write double.
-        /// </summary>
-        /// <param name="val">Double value.</param>
-        public void WriteDouble(double val)
-        {
-            long val0 = *(long*)(&val);
-
-            WriteLong(val0);
-        }
-
-        /// <summary>
-        /// Read double.
-        /// </summary>
-        /// <returns>
-        /// Double value.
-        /// </returns>
-        public double ReadDouble()
-        {
-            long val = ReadLong();
-
-            return BinaryUtils.LongToDoubleBits(val);
-        }
-
-        /// <summary>
-        /// Write double array.
-        /// </summary>
-        /// <param name="val">Double array.</param>
-        public abstract void WriteDoubleArray(double[] val);
-
-        /// <summary>
-        /// Internal routine to write double array.
-        /// </summary>
-        /// <param name="val">Double array.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        protected static void WriteDoubleArray0(double[] val, byte* data, int cnt)
-        {
-            if (LittleEndian)
-            {
-                fixed (double* val0 = val)
-                {
-                    CopyMemory((byte*)val0, data, cnt);
-                }
-            }
-            else
-            {
-                byte* curPos = data;
-
-                for (int i = 0; i < val.Length; i++)
-                {
-                    double val0 = val[i];
-
-                    byte* valPtr = (byte*)&(val0);
-
-                    *curPos++ = valPtr[7];
-                    *curPos++ = valPtr[6];
-                    *curPos++ = valPtr[5];
-                    *curPos++ = valPtr[4];
-                    *curPos++ = valPtr[3];
-                    *curPos++ = valPtr[2];
-                    *curPos++ = valPtr[1];
-                    *curPos++ = valPtr[0];
-                }
-            }
-        }
-
-        /// <summary>
-        /// Read double array.
-        /// </summary>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Double array.
-        /// </returns>
-        public abstract double[] ReadDoubleArray(int cnt);
-
-        /// <summary>
-        /// Internal routine to read double array.
-        /// </summary>
-        /// <param name="len">Count.</param>
-        /// <param name="data">Data pointer.</param>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Double array</returns>
-        protected static double[] ReadDoubleArray0(int len, byte* data, int cnt)
-        {
-            double[] res = new double[len];
-
-            if (LittleEndian)
-            {
-                fixed (double* res0 = res)
-                {
-                    CopyMemory(data, (byte*)res0, cnt);
-                }
-            }
-            else
-            {
-                for (int i = 0; i < len; i++)
-                {
-                    double val;
-
-                    byte* valPtr = (byte*)&val;
-
-                    valPtr[7] = *data++;
-                    valPtr[6] = *data++;
-                    valPtr[5] = *data++;
-                    valPtr[4] = *data++;
-                    valPtr[3] = *data++;
-                    valPtr[2] = *data++;
-                    valPtr[1] = *data++;
-                    valPtr[0] = *data++;
-
-                    res[i] = val;
-                }
-            }
-
-            return res;
-        }
-
-        /// <summary>
-        /// Write string.
-        /// </summary>
-        /// <param name="chars">Characters.</param>
-        /// <param name="charCnt">Char count.</param>
-        /// <param name="byteCnt">Byte count.</param>
-        /// <param name="encoding">Encoding.</param>
-        /// <returns>
-        /// Amounts of bytes written.
-        /// </returns>
-        public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding);
-
-        /// <summary>
-        /// Write arbitrary data.
-        /// </summary>
-        /// <param name="src">Source array.</param>
-        /// <param name="off">Offset</param>
-        /// <param name="cnt">Count.</param>
-        public void Write(byte[] src, int off, int cnt)
-        {
-            fixed (byte* src0 = src)
-            {
-                Write(src0 + off, cnt);
-            }
-        }
-
-        /// <summary>
-        /// Read arbitrary data.
-        /// </summary>
-        /// <param name="dest">Destination array.</param>
-        /// <param name="off">Offset.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns>
-        /// Amount of bytes read.
-        /// </returns>
-        public void Read(byte[] dest, int off, int cnt)
-        {
-            fixed (byte* dest0 = dest)
-            {
-                Read(dest0 + off, cnt);
-            }
-        }
-
-        /// <summary>
-        /// Write arbitrary data.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="cnt">Count.</param>
-        public abstract void Write(byte* src, int cnt);
-
-        /// <summary>
-        /// Internal write routine.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="cnt">Count.</param>
-        /// <param name="data">Data (dsetination).</param>
-        protected void WriteInternal(byte* src, int cnt, byte* data)
-        {
-            CopyMemory(src, data + Pos, cnt);
-        }
-
-        /// <summary>
-        /// Read arbitrary data.
-        /// </summary>
-        /// <param name="dest">Destination.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns></returns>
-        public abstract void Read(byte* dest, int cnt);
-
-        /// <summary>
-        /// Internal read routine.
-        /// </summary>
-        /// <param name="src">Source</param>
-        /// <param name="dest">Destination.</param>
-        /// <param name="cnt">Count.</param>
-        /// <returns>Amount of bytes written.</returns>
-        protected void ReadInternal(byte* src, byte* dest, int cnt)
-        {
-            int cnt0 = Math.Min(Remaining, cnt);
-
-            CopyMemory(src + Pos, dest, cnt0);
-
-            ShiftRead(cnt0);
-        }
-
-        /// <summary>
-        /// Position.
-        /// </summary>
-        public int Position
-        {
-            get { return Pos; }
-        }
-
-        /// <summary>
-        /// Gets remaining bytes in the stream.
-        /// </summary>
-        /// <value>
-        ///     Remaining bytes.
-        /// </value>
-        public abstract int Remaining { get; }
-
-        /// <summary>
-        /// Gets underlying array, avoiding copying if possible.
-        /// </summary>
-        /// <returns>
-        /// Underlying array.
-        /// </returns>
-        public abstract byte[] GetArray();
-
-        /// <summary>
-        /// Gets underlying data in a new array.
-        /// </summary>
-        /// <returns>
-        /// New array with data.
-        /// </returns>
-        public abstract byte[] GetArrayCopy();
-
-        /// <summary>
-        /// Check whether array passed as argument is the same as the stream hosts.
-        /// </summary>
-        /// <param name="arr">Array.</param>
-        /// <returns>
-        ///   <c>True</c> if they are same.
-        /// </returns>
-        public abstract bool IsSameArray(byte[] arr);
-        
-        /// <summary>
-        /// Seek to the given position.
-        /// </summary>
-        /// <param name="offset">Offset.</param>
-        /// <param name="origin">Seek origin.</param>
-        /// <returns>
-        /// Position.
-        /// </returns>
-        /// <exception cref="System.ArgumentException">
-        /// Unsupported seek origin:  + origin
-        /// or
-        /// Seek before origin:  + newPos
-        /// </exception>
-        public int Seek(int offset, SeekOrigin origin)
-        {
-            int newPos;
-
-            switch (origin)
-            {
-                case SeekOrigin.Begin:
-                    {
-                        newPos = offset;
-
-                        break;
-                    }
-
-                case SeekOrigin.Current:
-                    {
-                        newPos = Pos + offset;
-
-                        break;
-                    }
-
-                default:
-                    throw new ArgumentException("Unsupported seek origin: " + origin);
-            }
-
-            if (newPos < 0)
-                throw new ArgumentException("Seek before origin: " + newPos);
-
-            EnsureWriteCapacity(newPos);
-
-            Pos = newPos;
-
-            return Pos;
-        }
-
-        /// <summary>
-        /// Returns a hash code for the specified byte range.
-        /// </summary>
-        public abstract T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg);
-
-        /// <summary>
-        /// Flushes the data to underlying storage.
-        /// </summary>
-        public void Flush()
-        {
-            // No-op.
-        }
-
-        /** <inheritdoc /> */
-        public void Dispose()
-        {
-            if (_disposed)
-                return;
-
-            Dispose(true);
-
-            GC.SuppressFinalize(this);
-
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
-        /// </summary>
-        protected abstract void Dispose(bool disposing);
-
-        /// <summary>
-        /// Ensure capacity for write.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected abstract void EnsureWriteCapacity(int cnt);
-
-        /// <summary>
-        /// Ensure capacity for write and shift position.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Position before shift.</returns>
-        protected int EnsureWriteCapacityAndShift(int cnt)
-        {
-            int pos0 = Pos;
-
-            EnsureWriteCapacity(Pos + cnt);
-
-            ShiftWrite(cnt);
-
-            return pos0;
-        }
-
-        /// <summary>
-        /// Ensure capacity for read.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected abstract void EnsureReadCapacity(int cnt);
-
-        /// <summary>
-        /// Ensure capacity for read and shift position.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        /// <returns>Position before shift.</returns>
-        protected int EnsureReadCapacityAndShift(int cnt)
-        {
-            int pos0 = Pos;
-
-            EnsureReadCapacity(cnt);
-
-            ShiftRead(cnt);
-
-            return pos0;
-        }
-
-        /// <summary>
-        /// Shift position due to write
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        protected void ShiftWrite(int cnt)
-        {
-            Pos += cnt;
-        }
-
-        /// <summary>
-        /// Shift position due to read.
-        /// </summary>
-        /// <param name="cnt">Bytes count.</param>
-        private void ShiftRead(int cnt)
-        {
-            Pos += cnt;
-        }
-
-        /// <summary>
-        /// Calculate new capacity.
-        /// </summary>
-        /// <param name="curCap">Current capacity.</param>
-        /// <param name="reqCap">Required capacity.</param>
-        /// <returns>New capacity.</returns>
-        protected static int Capacity(int curCap, int reqCap)
-        {
-            int newCap;
-
-            if (reqCap < 256)
-                newCap = 256;
-            else
-            {
-                newCap = curCap << 1;
-
-                if (newCap < reqCap)
-                    newCap = reqCap;
-            }
-
-            return newCap;
-        }
-
-        /// <summary>
-        /// Unsafe memory copy routine.
-        /// </summary>
-        /// <param name="src">Source.</param>
-        /// <param name="dest">Destination.</param>
-        /// <param name="len">Length.</param>
-        private static void CopyMemory(byte* src, byte* dest, int len)
-        {
-            PlatformMemoryUtils.CopyMemory(src, dest, len);
-        }
-    }
-}


[5/6] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3478

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23742962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23742962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23742962

Branch: refs/heads/ignite-3478
Commit: 23742962f8d539aac33a7ac953f09a1407b330e9
Parents: 970cf47 b8b7c50
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:36:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:36:52 2017 +0300

----------------------------------------------------------------------
 .../internal/MarshallerMappingFileStore.java    |   15 +-
 .../service/GridServiceProcessor.java           |   11 +-
 .../nio/GridAbstractCommunicationClient.java    |    2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  378 +++---
 .../IgniteMarshallerCacheFSRestoreTest.java     |   71 +-
 .../Apache.Ignite.Core.csproj                   |    1 -
 .../Impl/Binary/Io/BinaryHeapStream.cs          | 1018 +++++++++++++-
 .../Impl/Binary/Io/BinaryStreamBase.cs          | 1249 ------------------
 8 files changed, 1237 insertions(+), 1508 deletions(-)
----------------------------------------------------------------------