You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 10:43:23 UTC
[1/6] ignite git commit: IGNITE-6545: Failure during Ignite
Service.cancel() can break normal shutdown process. This closes #2807.
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 970cf47a5 -> f29d4bc50
IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ffa1099
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ffa1099
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ffa1099
Branch: refs/heads/ignite-3478
Commit: 8ffa1099e2afd14052f7c91be822b2aa3f5f2a8d
Parents: 0f3546a
Author: AMRepo <an...@gmail.com>
Authored: Tue Oct 10 11:57:20 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Oct 11 12:00:26 2017 +0300
----------------------------------------------------------------------
.../processors/service/GridServiceProcessor.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ffa1099/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 9272760..6f1dfc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -316,7 +316,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
Service svc = ctx.service();
if (svc != null)
- svc.cancel(ctx);
+ try {
+ svc.cancel(ctx);
+ }
+ catch (Throwable e) {
+ log.error("Failed to cancel service (ignoring) [name=" + ctx.name() +
+ ", execId=" + ctx.executionId() + ']', e);
+
+ if (e instanceof Error)
+ throw e;
+ }
ctx.executor().shutdownNow();
}
[6/6] ignite git commit: ignite-3478 Fixed query ack
Posted by sb...@apache.org.
ignite-3478 Fixed query ack
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f29d4bc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f29d4bc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f29d4bc5
Branch: refs/heads/ignite-3478
Commit: f29d4bc50801c530ef856d168fb637b0fad1c27b
Parents: 2374296
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:07 2017 +0300
----------------------------------------------------------------------
.../cache/mvcc/CacheCoordinatorsProcessor.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f29d4bc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 9f9a7a3..85dde15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -261,7 +261,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
crd = crdNode != null ? new
- MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null;
+ MvccCoordinator(crdNode.id(), coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null;
if (crd != null)
log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']');
@@ -274,6 +274,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param topVer Topology version.
+ * @return Coordinator version.
+ */
+ private long coordinatorVersion(long topVer) {
+ return topVer + ctx.discovery().gridStartTime();
+ }
+
+ /**
* @param log Logger.
*/
public void dumpStatistics(IgniteLogger log) {
@@ -1022,7 +1030,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
", topVer=" + topVer + ']');
- crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
+ crdVer = coordinatorVersion(topVer.topologyVersion());
prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
[3/6] ignite git commit: IGNITE-6536 Node fails when detects mapping
storage corruption
Posted by sb...@apache.org.
IGNITE-6536 Node fails when detects mapping storage corruption
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5490c7d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5490c7d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5490c7d9
Branch: refs/heads/ignite-3478
Commit: 5490c7d927daca2d64f108b6c2eafe03bbd6f54e
Parents: b0158fb
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Oct 11 15:33:23 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Oct 11 15:34:06 2017 +0300
----------------------------------------------------------------------
.../internal/MarshallerMappingFileStore.java | 15 +++--
.../IgniteMarshallerCacheFSRestoreTest.java | 71 +++++++++++++++++++-
2 files changed, 77 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index eabbdb8..59a99b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -167,16 +167,19 @@ final class MarshallerMappingFileStore {
try (FileInputStream in = new FileInputStream(file)) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
- String className = reader.readLine();
+ String clsName = reader.readLine();
- marshCtx.registerClassNameLocally(platformId, typeId, className);
+ if (clsName == null) {
+ throw new IgniteCheckedException("Class name is null for [platformId=" + platformId +
+ ", typeId=" + typeId + "], marshaller mappings storage is broken. " +
+ "Clean up marshaller directory (<work_dir>/marshaller) and restart the node.");
+ }
+
+ marshCtx.registerClassNameLocally(platformId, typeId, clsName);
}
}
catch (IOException e) {
- throw new IgniteCheckedException("Reading marshaller mapping from file "
- + name
- + " failed."
- , e);
+ throw new IgniteCheckedException("Reading marshaller mapping from file " + name + " failed.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 21a3e43..ac15971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -23,13 +23,16 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
@@ -47,6 +50,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
/** */
private volatile boolean isDuplicateObserved = true;
+ /** */
+ private boolean isPersistenceEnabled;
+
/**
*
*/
@@ -67,6 +73,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
}
}
+ /** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -75,13 +82,17 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(discoSpi);
- CacheConfiguration singleCacheConfig = new CacheConfiguration()
+ CacheConfiguration singleCacheCfg = new CacheConfiguration()
.setName(DEFAULT_CACHE_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1)
.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- cfg.setCacheConfiguration(singleCacheConfig);
+ cfg.setCacheConfiguration(singleCacheCfg);
+
+ //persistence must be enabled to verify restoring mappings from FS case
+ if (isPersistenceEnabled)
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
return cfg;
}
@@ -110,11 +121,14 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
* In that case the request must not be marked as duplicate and must be processed in a regular way.
* No hangs must take place.
*
- * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> Take a look at JIRA ticket for more information about context of this test.
+ * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> JIRA ticket
+ * provides more information about context of this test.
*
* This test must never hang on proposing of MarshallerMapping.
*/
public void testFileMappingReadAndPropose() throws Exception {
+ isPersistenceEnabled = false;
+
prepareMarshallerFileStore();
IgniteEx ignite0 = startGrid(0);
@@ -162,6 +176,57 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
}
}
+ /**
+ * Verifies scenario that node with corrupted marshaller mapping store must fail on startup
+ * with appropriate error message.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/IGNITE-6536">IGNITE-6536</a> JIRA provides more information
+ * about this case.
+ */
+ public void testNodeStartFailsOnCorruptedStorage() throws Exception {
+ isPersistenceEnabled = true;
+
+ Ignite ig = startGrids(3);
+
+ ig.active(true);
+
+ ig.cache(DEFAULT_CACHE_NAME).put(0, new SimpleValue(0, "value0"));
+
+ stopAllGrids();
+
+ corruptMarshallerStorage();
+
+ try {
+ startGrid(0);
+ }
+ catch (IgniteCheckedException e) {
+ verifyException((IgniteCheckedException) e.getCause());
+ }
+ }
+
+ /**
+ * Class name for CustomClass class mapping file gets cleaned up from file system.
+ */
+ private void corruptMarshallerStorage() throws Exception {
+ String marshallerDir = U.defaultWorkDirectory() + File.separator + "marshaller";
+
+ File[] storedMappingsFiles = new File(marshallerDir).listFiles();
+
+ assert storedMappingsFiles.length == 1;
+
+ try (FileOutputStream out = new FileOutputStream(storedMappingsFiles[0])) {
+ out.getChannel().truncate(0);
+ }
+ }
+
+ /** */
+ private void verifyException(IgniteCheckedException e) throws Exception {
+ String msg = e.getMessage();
+
+ if (msg == null || !msg.contains("Class name is null"))
+ throw new Exception("Exception with unexpected message was thrown: " + msg, e);
+ }
+
/** */
private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
[2/6] ignite git commit: IGNITE-6542 Reliably close SocketChannel in
TcpCommunicationSpi.
Posted by sb...@apache.org.
IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.
Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2
Branch: refs/heads/ignite-3478
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300
----------------------------------------------------------------------
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 ++++++++++---------
2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
/** {@inheritDoc} */
@Override public void forceClose() {
- closed.set(false);
+ closed.set(true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
try {
- safeHandshake(client,
- null,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- null,
- null);
+ safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
- boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
int lastWaitingTimeout = 1;
- while (!conn) { // Reconnection on handshake timeout.
+ while (client == null) { // Reconnection on handshake timeout.
+ boolean needWait = false;
+
try {
SocketChannel ch = SocketChannel.open();
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return null;
}
- Long rcvCnt = null;
+ Long rcvCnt;
Map<Integer, Object> meta = new HashMap<>();
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
Integer handshakeConnIdx = connIdx;
- rcvCnt = safeHandshake(ch,
+ rcvCnt = safeTcpHandshake(ch,
recoveryDesc,
node.id(),
timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
handshakeConnIdx);
if (rcvCnt == ALREADY_CONNECTED) {
- recoveryDesc.release();
-
return null;
}
else if (rcvCnt == NODE_STOPPING) {
- recoveryDesc.release();
-
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
}
else if (rcvCnt == NEED_WAIT) {
- recoveryDesc.release();
-
- U.closeQuiet(ch);
-
- if (lastWaitingTimeout < 60000)
- lastWaitingTimeout *= 2;
-
- U.sleep(lastWaitingTimeout);
+ needWait = true;
continue;
}
- }
- finally {
- if (recoveryDesc != null && rcvCnt == null)
- recoveryDesc.release();
- }
- try {
meta.put(CONN_IDX_META, connKey);
if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
- conn = true;
}
finally {
- if (!conn) {
+ if (client == null) {
+ U.closeQuiet(ch);
+
if (recoveryDesc != null)
recoveryDesc.release();
+
+ if (needWait) {
+ if (lastWaitingTimeout < 60000)
+ lastWaitingTimeout *= 2;
+
+ U.sleep(lastWaitingTimeout);
+ }
}
}
}
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- if (conn)
+ if (client != null)
break;
}
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* Performs handshake in timeout-safe way.
*
* @param client Client.
+ * @param rmtNodeId Remote node.
+ * @param timeout Timeout for handshake.
+ * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private void safeShmemHandshake(
+ GridCommunicationClient client,
+ UUID rmtNodeId,
+ long timeout
+ ) throws IgniteCheckedException {
+ HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+ U.currentTimeMillis() + timeout);
+
+ addTimeoutObject(obj);
+
+ try {
+ client.doHandshake(new HandshakeClosure(rmtNodeId));
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ removeTimeoutObject(obj);
+
+ // Ignoring whatever happened after timeout - reporting only timeout event.
+ if (!cancelled)
+ throw new HandshakeTimeoutException(
+ new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property)."));
+ }
+ }
+
+ /**
+ * Performs handshake in timeout-safe way.
+ *
+ * @param ch Socket channel.
* @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- private <T> long safeHandshake(
- T client,
+ private long safeTcpHandshake(
+ SocketChannel ch,
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
GridSslMeta sslMeta,
@Nullable Integer handshakeConnIdx
) throws IgniteCheckedException {
- HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+ HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
long rcvCnt = 0;
try {
- if (client instanceof GridCommunicationClient)
- ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
- else {
- SocketChannel ch = (SocketChannel)client;
+ BlockingSslHandler sslHnd = null;
- boolean success = false;
+ ByteBuffer buf;
- try {
- BlockingSslHandler sslHnd = null;
+ if (isSslEnabled()) {
+ assert sslMeta != null;
- ByteBuffer buf;
+ sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
- if (isSslEnabled()) {
- assert sslMeta != null;
+ if (!sslHnd.handshake())
+ throw new HandshakeException("SSL handshake is not completed.");
- sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+ ByteBuffer handBuff = sslHnd.applicationBuffer();
- if (!sslHnd.handshake())
- throw new HandshakeException("SSL handshake is not completed.");
+ if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+ buf = ByteBuffer.allocate(1000);
- ByteBuffer handBuff = sslHnd.applicationBuffer();
+ int read = ch.read(buf);
- if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
- buf = ByteBuffer.allocate(1000);
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- int read = ch.read(buf);
+ buf.flip();
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
- buf.flip();
-
- buf = sslHnd.decode(buf);
- }
- else
- buf = handBuff;
- }
- else {
- buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+ buf = sslHnd.decode(buf);
+ }
+ else
+ buf = handBuff;
+ }
+ else {
+ buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
- for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- i += read;
- }
- }
+ i += read;
+ }
+ }
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+ UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
- if (!rmtNodeId.equals(rmtNodeId0))
- throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
- ", rcvd=" + rmtNodeId0 + ']');
- else if (log.isDebugEnabled())
- log.debug("Received remote node ID: " + rmtNodeId0);
+ if (!rmtNodeId.equals(rmtNodeId0))
+ throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+ ", rcvd=" + rmtNodeId0 + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Received remote node ID: " + rmtNodeId0);
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
- }
- else
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- ClusterNode locNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
- if (recovery != null) {
- HandshakeMessage msg;
+ if (recovery != null) {
+ HandshakeMessage msg;
- int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+ int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
+ if (handshakeConnIdx != null) {
+ msg = new HandshakeMessage2(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received(),
+ handshakeConnIdx);
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
+ msgSize += 4;
+ }
+ else {
+ msg = new HandshakeMessage(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received());
+ }
- if (log.isDebugEnabled())
- log.debug("Writing handshake message [locNodeId=" + locNode.id() +
- ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+ ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(msgSize);
+ buf = ByteBuffer.allocate(msgSize);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- boolean written = msg.writeTo(buf, null);
+ boolean written = msg.writeTo(buf, null);
- assert written;
+ assert written;
- buf.flip();
+ buf.flip();
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(buf));
- }
- else
- ch.write(buf);
- }
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ ch.write(sslHnd.encrypt(buf));
+ }
+ else
+ ch.write(buf);
+ }
+ else {
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
- }
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- }
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+ }
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ if (recovery != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- buf = ByteBuffer.allocate(1000);
- buf.order(ByteOrder.nativeOrder());
+ buf = ByteBuffer.allocate(1000);
+ buf.order(ByteOrder.nativeOrder());
- ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
- decode.order(ByteOrder.nativeOrder());
+ ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+ decode.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- buf.flip();
+ buf.flip();
- ByteBuffer decode0 = sslHnd.decode(buf);
+ ByteBuffer decode0 = sslHnd.decode(buf);
- i += decode0.remaining();
+ i += decode0.remaining();
- decode = appendAndResizeIfNeeded(decode, decode0);
+ decode = appendAndResizeIfNeeded(decode, decode0);
- buf.clear();
- }
+ buf.clear();
+ }
- decode.flip();
+ decode.flip();
- rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+ rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
- if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
- decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+ decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- sslMeta.decodedBuffer(decode);
- }
+ sslMeta.decodedBuffer(decode);
+ }
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ ByteBuffer inBuf = sslHnd.inputBuffer();
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (inBuf.position() > 0)
+ sslMeta.encodedBuffer(inBuf);
+ }
+ else {
+ buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- i += read;
- }
+ i += read;
+ }
- rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
- }
+ rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+ }
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
- else
- success = true;
- }
- else
- success = true;
- }
- catch (IOException e) {
+ if (rcvCnt == -1) {
if (log.isDebugEnabled())
- log.debug("Failed to read from channel: " + e);
-
- throw new IgniteCheckedException("Failed to read from channel.", e);
- }
- finally {
- if (!success)
- U.closeQuiet(ch);
+ log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
}
}
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to read from channel: " + e);
+
+ throw new IgniteCheckedException("Failed to read from channel.", e);
+ }
finally {
boolean cancelled = obj.cancel();
[4/6] ignite git commit: IGNITE-5928 .NET: Get rid of BinaryStreamBase
Posted by sb...@apache.org.
IGNITE-5928 .NET: Get rid of BinaryStreamBase
This closes #2835
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b7c508
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b7c508
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b7c508
Branch: refs/heads/ignite-3478
Commit: b8b7c50865d37449616cc6dadeeaba84526945f4
Parents: 5490c7d
Author: Alexey Popov <ta...@gmail.com>
Authored: Thu Oct 12 12:44:00 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Oct 12 12:44:00 2017 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.csproj | 1 -
.../Impl/Binary/Io/BinaryHeapStream.cs | 1018 +++++++++++++-
.../Impl/Binary/Io/BinaryStreamBase.cs | 1249 ------------------
3 files changed, 958 insertions(+), 1310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 58abd26..446208a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -431,7 +431,6 @@
<Compile Include="Impl\Messaging\Messaging.cs" />
<Compile Include="Impl\NativeMethods.cs" />
<Compile Include="Impl\Binary\IO\IBinaryStream.cs" />
- <Compile Include="Impl\Binary\IO\BinaryStreamBase.cs" />
<Compile Include="Impl\Binary\IO\BinaryHeapStream.cs" />
<Compile Include="Impl\Binary\IBinaryTypeDescriptor.cs" />
<Compile Include="Impl\Binary\IBinaryWriteAware.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a14da0a..a6082f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -22,12 +22,28 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
+ using Apache.Ignite.Core.Impl.Memory;
/// <summary>
/// Binary onheap stream.
/// </summary>
- internal unsafe class BinaryHeapStream : BinaryStreamBase
+ internal unsafe class BinaryHeapStream : IBinaryStream
{
+ /** Byte: zero. */
+ private const byte ByteZero = 0;
+
+ /** Byte: one. */
+ private const byte ByteOne = 1;
+
+ /** LITTLE_ENDIAN flag. */
+ private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
+
+ /** Position. */
+ private int _pos;
+
+ /** Disposed flag. */
+ private bool _disposed;
+
/** Data array. */
private byte[] _data;
@@ -53,8 +69,903 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
_data = data;
}
+ /// <summary>
+ /// Internal routine to write byte array.
+ /// </summary>
+ /// <param name="val">Byte array.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteByteArray0(byte[] val, byte* data)
+ {
+ fixed (byte* val0 = val)
+ {
+ CopyMemory(val0, data, val.Length);
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read byte array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Byte array</returns>
+ private static byte[] ReadByteArray0(int len, byte* data)
+ {
+ byte[] res = new byte[len];
+
+ fixed (byte* res0 = res)
+ {
+ CopyMemory(data, res0, len);
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteBool(bool val)
+ {
+ WriteByte(val ? ByteOne : ByteZero);
+ }
+
+ /** <inheritdoc /> */
+ public bool ReadBool()
+ {
+ return ReadByte() == ByteOne;
+ }
+
+ /// <summary>
+ /// Internal routine to write bool array.
+ /// </summary>
+ /// <param name="val">Bool array.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteBoolArray0(bool[] val, byte* data)
+ {
+ fixed (bool* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, val.Length);
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read bool array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Bool array</returns>
+ private static bool[] ReadBoolArray0(int len, byte* data)
+ {
+ bool[] res = new bool[len];
+
+ fixed (bool* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, len);
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write short value.
+ /// </summary>
+ /// <param name="val">Short value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteShort0(short val, byte* data)
+ {
+ if (LittleEndian)
+ *((short*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[1];
+ data[1] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read short value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Short value</returns>
+ private static short ReadShort0(byte* data)
+ {
+ short val;
+
+ if (LittleEndian)
+ val = *((short*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[1];
+ valPtr[1] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write short array.
+ /// </summary>
+ /// <param name="val">Short array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteShortArray0(short[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (short* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ short val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read short array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Short array</returns>
+ private static short[] ReadShortArray0(int len, byte* data, int cnt)
+ {
+ short[] res = new short[len];
+
+ if (LittleEndian)
+ {
+ fixed (short* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ short val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteChar(char val)
+ {
+ WriteShort(*(short*)(&val));
+ }
+
+ /** <inheritdoc /> */
+ public char ReadChar()
+ {
+ short val = ReadShort();
+
+ return *(char*)(&val);
+ }
+
+ /// <summary>
+ /// Internal routine to write char array.
+ /// </summary>
+ /// <param name="val">Char array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteCharArray0(char[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (char* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ char val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read char array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Char array</returns>
+ private static char[] ReadCharArray0(int len, byte* data, int cnt)
+ {
+ char[] res = new char[len];
+
+ if (LittleEndian)
+ {
+ fixed (char* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ char val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write int value.
+ /// </summary>
+ /// <param name="val">Int value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteInt0(int val, byte* data)
+ {
+ if (LittleEndian)
+ *((int*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[3];
+ data[1] = valPtr[2];
+ data[2] = valPtr[1];
+ data[3] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read int value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Int value</returns>
+ private static int ReadInt0(byte* data) {
+ int val;
+
+ if (LittleEndian)
+ val = *((int*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[3];
+ valPtr[1] = data[2];
+ valPtr[2] = data[1];
+ valPtr[3] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write int array.
+ /// </summary>
+ /// <param name="val">Int array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteIntArray0(int[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (int* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ int val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read int array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Int array</returns>
+ private static int[] ReadIntArray0(int len, byte* data, int cnt)
+ {
+ int[] res = new int[len];
+
+ if (LittleEndian)
+ {
+ fixed (int* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ int val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteFloat(float val)
+ {
+ int val0 = *(int*)(&val);
+
+ WriteInt(val0);
+ }
+
+ /** <inheritdoc /> */
+ public float ReadFloat()
+ {
+ int val = ReadInt();
+
+ return BinaryUtils.IntToFloatBits(val);
+ }
+
+ /// <summary>
+ /// Internal routine to write float array.
+ /// </summary>
+ /// <param name="val">Int array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteFloatArray0(float[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (float* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ float val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read float array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Float array</returns>
+ private static float[] ReadFloatArray0(int len, byte* data, int cnt)
+ {
+ float[] res = new float[len];
+
+ if (LittleEndian)
+ {
+ fixed (float* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ int val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write long value.
+ /// </summary>
+ /// <param name="val">Long value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteLong0(long val, byte* data)
+ {
+ if (LittleEndian)
+ *((long*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[7];
+ data[1] = valPtr[6];
+ data[2] = valPtr[5];
+ data[3] = valPtr[4];
+ data[4] = valPtr[3];
+ data[5] = valPtr[2];
+ data[6] = valPtr[1];
+ data[7] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read long value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Long value</returns>
+ private static long ReadLong0(byte* data)
+ {
+ long val;
+
+ if (LittleEndian)
+ val = *((long*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[7];
+ valPtr[1] = data[6];
+ valPtr[2] = data[5];
+ valPtr[3] = data[4];
+ valPtr[4] = data[3];
+ valPtr[5] = data[2];
+ valPtr[6] = data[1];
+ valPtr[7] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write long array.
+ /// </summary>
+ /// <param name="val">Long array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteLongArray0(long[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (long* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ long val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[7];
+ *curPos++ = valPtr[6];
+ *curPos++ = valPtr[5];
+ *curPos++ = valPtr[4];
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read long array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Long array</returns>
+ private static long[] ReadLongArray0(int len, byte* data, int cnt)
+ {
+ long[] res = new long[len];
+
+ if (LittleEndian)
+ {
+ fixed (long* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ long val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[7] = *data++;
+ valPtr[6] = *data++;
+ valPtr[5] = *data++;
+ valPtr[4] = *data++;
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteDouble(double val)
+ {
+ long val0 = *(long*)(&val);
+
+ WriteLong(val0);
+ }
+
+ /** <inheritdoc /> */
+ public double ReadDouble()
+ {
+ long val = ReadLong();
+
+ return BinaryUtils.LongToDoubleBits(val);
+ }
+
+ /// <summary>
+ /// Internal routine to write double array.
+ /// </summary>
+ /// <param name="val">Double array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteDoubleArray0(double[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (double* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ double val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[7];
+ *curPos++ = valPtr[6];
+ *curPos++ = valPtr[5];
+ *curPos++ = valPtr[4];
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read double array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Double array</returns>
+ private static double[] ReadDoubleArray0(int len, byte* data, int cnt)
+ {
+ double[] res = new double[len];
+
+ if (LittleEndian)
+ {
+ fixed (double* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ double val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[7] = *data++;
+ valPtr[6] = *data++;
+ valPtr[5] = *data++;
+ valPtr[4] = *data++;
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void Write(byte[] src, int off, int cnt)
+ {
+ fixed (byte* src0 = src)
+ {
+ Write(src0 + off, cnt);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public void Read(byte[] dest, int off, int cnt)
+ {
+ fixed (byte* dest0 = dest)
+ {
+ Read(dest0 + off, cnt);
+ }
+ }
+
+ /// <summary>
+ /// Internal write routine.
+ /// </summary>
+ /// <param name="src">Source.</param>
+ /// <param name="cnt">Count.</param>
+ /// <param name="data">Data (dsetination).</param>
+ private void WriteInternal(byte* src, int cnt, byte* data)
+ {
+ CopyMemory(src, data + _pos, cnt);
+ }
+
+ /// <summary>
+ /// Internal read routine.
+ /// </summary>
+ /// <param name="src">Source</param>
+ /// <param name="dest">Destination.</param>
+ /// <param name="cnt">Count.</param>
+ /// <returns>Amount of bytes written.</returns>
+ private void ReadInternal(byte* src, byte* dest, int cnt)
+ {
+ int cnt0 = Math.Min(Remaining, cnt);
+
+ CopyMemory(src + _pos, dest, cnt0);
+
+ ShiftRead(cnt0);
+ }
+
+ /** <inheritdoc /> */
+ public int Position
+ {
+ get { return _pos; }
+ }
+
/** <inheritdoc /> */
- public override void WriteByte(byte val)
+ public int Remaining
+ {
+ get { return _data.Length - _pos; }
+ }
+
+ /// <summary>
+ /// Internal array.
+ /// </summary>
+ internal byte[] InternalArray
+ {
+ get { return _data; }
+ }
+
+ /// <inheritdoc />
+ /// <exception cref="T:System.ArgumentException">
+ /// Unsupported seek origin: + origin
+ /// or
+ /// Seek before origin: + newPos
+ /// </exception>
+ public int Seek(int offset, SeekOrigin origin)
+ {
+ int newPos;
+
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ {
+ newPos = offset;
+
+ break;
+ }
+
+ case SeekOrigin.Current:
+ {
+ newPos = _pos + offset;
+
+ break;
+ }
+
+ default:
+ throw new ArgumentException("Unsupported seek origin: " + origin);
+ }
+
+ if (newPos < 0)
+ throw new ArgumentException("Seek before origin: " + newPos);
+
+ EnsureWriteCapacity(newPos);
+
+ _pos = newPos;
+
+ return _pos;
+ }
+
+ /** <inheritdoc /> */
+ public void Flush()
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ GC.SuppressFinalize(this);
+
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Ensure capacity for write and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private int EnsureWriteCapacityAndShift(int cnt)
+ {
+ int pos0 = _pos;
+
+ EnsureWriteCapacity(_pos + cnt);
+
+ ShiftWrite(cnt);
+
+ return pos0;
+ }
+
+ /// <summary>
+ /// Ensure capacity for read and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private int EnsureReadCapacityAndShift(int cnt)
+ {
+ int pos0 = _pos;
+
+ EnsureReadCapacity(cnt);
+
+ ShiftRead(cnt);
+
+ return pos0;
+ }
+
+ /// <summary>
+ /// Shift position due to write
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ private void ShiftWrite(int cnt)
+ {
+ _pos += cnt;
+ }
+
+ /// <summary>
+ /// Shift position due to read.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ private void ShiftRead(int cnt)
+ {
+ _pos += cnt;
+ }
+
+ /// <summary>
+ /// Calculate new capacity.
+ /// </summary>
+ /// <param name="curCap">Current capacity.</param>
+ /// <param name="reqCap">Required capacity.</param>
+ /// <returns>New capacity.</returns>
+ private static int Capacity(int curCap, int reqCap)
+ {
+ int newCap;
+
+ if (reqCap < 256)
+ newCap = 256;
+ else
+ {
+ newCap = curCap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+ }
+
+ return newCap;
+ }
+
+ /// <summary>
+ /// Unsafe memory copy routine.
+ /// </summary>
+ /// <param name="src">Source.</param>
+ /// <param name="dest">Destination.</param>
+ /// <param name="len">Length.</param>
+ private static void CopyMemory(byte* src, byte* dest, int len)
+ {
+ PlatformMemoryUtils.CopyMemory(src, dest, len);
+ }
+
+ /** <inheritdoc /> */
+ public void WriteByte(byte val)
{
int pos0 = EnsureWriteCapacityAndShift(1);
@@ -62,7 +973,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override byte ReadByte()
+ public byte ReadByte()
{
int pos0 = EnsureReadCapacityAndShift(1);
@@ -71,7 +982,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteByteArray(byte[] val)
+ public void WriteByteArray(byte[] val)
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
@@ -82,7 +993,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override byte[] ReadByteArray(int cnt)
+ public byte[] ReadByteArray(int cnt)
{
int pos0 = EnsureReadCapacityAndShift(cnt);
@@ -94,7 +1005,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteBoolArray(bool[] val)
+ public void WriteBoolArray(bool[] val)
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
@@ -105,7 +1016,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override bool[] ReadBoolArray(int cnt)
+ public bool[] ReadBoolArray(int cnt)
{
int pos0 = EnsureReadCapacityAndShift(cnt);
@@ -116,7 +1027,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteShort(short val)
+ public void WriteShort(short val)
{
int pos0 = EnsureWriteCapacityAndShift(2);
@@ -127,7 +1038,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override short ReadShort()
+ public short ReadShort()
{
int pos0 = EnsureReadCapacityAndShift(2);
@@ -139,7 +1050,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteShortArray(short[] val)
+ public void WriteShortArray(short[] val)
{
int cnt = val.Length << 1;
@@ -152,7 +1063,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override short[] ReadShortArray(int cnt)
+ public short[] ReadShortArray(int cnt)
{
int cnt0 = cnt << 1;
@@ -166,7 +1077,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteCharArray(char[] val)
+ public void WriteCharArray(char[] val)
{
int cnt = val.Length << 1;
@@ -179,7 +1090,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override char[] ReadCharArray(int cnt)
+ public char[] ReadCharArray(int cnt)
{
int cnt0 = cnt << 1;
@@ -192,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteInt(int val)
+ public void WriteInt(int val)
{
int pos0 = EnsureWriteCapacityAndShift(4);
@@ -203,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteInt(int writePos, int val)
+ public void WriteInt(int writePos, int val)
{
EnsureWriteCapacity(writePos + 4);
@@ -214,7 +1125,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int ReadInt()
+ public int ReadInt()
{
int pos0 = EnsureReadCapacityAndShift(4);
@@ -226,7 +1137,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteIntArray(int[] val)
+ public void WriteIntArray(int[] val)
{
int cnt = val.Length << 2;
@@ -239,7 +1150,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int[] ReadIntArray(int cnt)
+ public int[] ReadIntArray(int cnt)
{
int cnt0 = cnt << 2;
@@ -253,7 +1164,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteFloatArray(float[] val)
+ public void WriteFloatArray(float[] val)
{
int cnt = val.Length << 2;
@@ -266,7 +1177,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override float[] ReadFloatArray(int cnt)
+ public float[] ReadFloatArray(int cnt)
{
int cnt0 = cnt << 2;
@@ -279,7 +1190,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteLong(long val)
+ public void WriteLong(long val)
{
int pos0 = EnsureWriteCapacityAndShift(8);
@@ -290,7 +1201,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override long ReadLong()
+ public long ReadLong()
{
int pos0 = EnsureReadCapacityAndShift(8);
@@ -302,7 +1213,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteLongArray(long[] val)
+ public void WriteLongArray(long[] val)
{
int cnt = val.Length << 3;
@@ -315,7 +1226,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override long[] ReadLongArray(int cnt)
+ public long[] ReadLongArray(int cnt)
{
int cnt0 = cnt << 3;
@@ -329,7 +1240,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteDoubleArray(double[] val)
+ public void WriteDoubleArray(double[] val)
{
int cnt = val.Length << 3;
@@ -342,7 +1253,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override double[] ReadDoubleArray(int cnt)
+ public double[] ReadDoubleArray(int cnt)
{
int cnt0 = cnt << 3;
@@ -355,7 +1266,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
+ public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
{
int pos0 = EnsureWriteCapacityAndShift(byteCnt);
@@ -370,9 +1281,9 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void Write(byte* src, int cnt)
+ public void Write(byte* src, int cnt)
{
- EnsureWriteCapacity(Pos + cnt);
+ EnsureWriteCapacity(_pos + cnt);
fixed (byte* data0 = _data)
{
@@ -383,7 +1294,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void Read(byte* dest, int cnt)
+ public void Read(byte* dest, int cnt)
{
fixed (byte* data0 = _data)
{
@@ -392,36 +1303,30 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int Remaining
- {
- get { return _data.Length - Pos; }
- }
-
- /** <inheritdoc /> */
- public override byte[] GetArray()
+ public byte[] GetArray()
{
return _data;
}
/** <inheritdoc /> */
- public override byte[] GetArrayCopy()
+ public byte[] GetArrayCopy()
{
- byte[] copy = new byte[Pos];
+ byte[] copy = new byte[_pos];
- Buffer.BlockCopy(_data, 0, copy, 0, Pos);
+ Buffer.BlockCopy(_data, 0, copy, 0, _pos);
return copy;
}
/** <inheritdoc /> */
- public override bool IsSameArray(byte[] arr)
+ public bool IsSameArray(byte[] arr)
{
return _data == arr;
}
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
+ public T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
{
Debug.Assert(proc != null);
@@ -431,22 +1336,11 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
}
- /** <inheritdoc /> */
- protected override void Dispose(bool disposing)
- {
- // No-op.
- }
-
/// <summary>
- /// Internal array.
+ /// Ensure capacity for write.
/// </summary>
- internal byte[] InternalArray
- {
- get { return _data; }
- }
-
- /** <inheritdoc /> */
- protected override void EnsureWriteCapacity(int cnt)
+ /// <param name="cnt">Bytes count.</param>
+ private void EnsureWriteCapacity(int cnt)
{
if (cnt > _data.Length)
{
@@ -462,12 +1356,16 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
}
- /** <inheritdoc /> */
- protected override void EnsureReadCapacity(int cnt)
+ /// <summary>
+ /// Ensure capacity for write and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private void EnsureReadCapacity(int cnt)
{
- if (_data.Length - Pos < cnt)
+ if (_data.Length - _pos < cnt)
throw new EndOfStreamException("Not enough data in stream [expected=" + cnt +
- ", remaining=" + (_data.Length - Pos) + ']');
+ ", remaining=" + (_data.Length - _pos) + ']');
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
deleted file mode 100644
index 0b855f8..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
+++ /dev/null
@@ -1,1249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary.IO
-{
- using System;
- using System.IO;
- using System.Text;
- using Apache.Ignite.Core.Impl.Memory;
-
- /// <summary>
- /// Base class for managed and unmanaged data streams.
- /// </summary>
- internal abstract unsafe class BinaryStreamBase : IBinaryStream
- {
- /** Byte: zero. */
- private const byte ByteZero = 0;
-
- /** Byte: one. */
- private const byte ByteOne = 1;
-
- /** LITTLE_ENDIAN flag. */
- private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
-
- /** Position. */
- protected int Pos;
-
- /** Disposed flag. */
- private bool _disposed;
-
- /// <summary>
- /// Write byte.
- /// </summary>
- /// <param name="val">Byte value.</param>
- public abstract void WriteByte(byte val);
-
- /// <summary>
- /// Read byte.
- /// </summary>
- /// <returns>
- /// Byte value.
- /// </returns>
- public abstract byte ReadByte();
-
- /// <summary>
- /// Write byte array.
- /// </summary>
- /// <param name="val">Byte array.</param>
- public abstract void WriteByteArray(byte[] val);
-
- /// <summary>
- /// Internal routine to write byte array.
- /// </summary>
- /// <param name="val">Byte array.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteByteArray0(byte[] val, byte* data)
- {
- fixed (byte* val0 = val)
- {
- CopyMemory(val0, data, val.Length);
- }
- }
-
- /// <summary>
- /// Read byte array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Byte array.
- /// </returns>
- public abstract byte[] ReadByteArray(int cnt);
-
- /// <summary>
- /// Internal routine to read byte array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <returns>Byte array</returns>
- protected static byte[] ReadByteArray0(int len, byte* data)
- {
- byte[] res = new byte[len];
-
- fixed (byte* res0 = res)
- {
- CopyMemory(data, res0, len);
- }
-
- return res;
- }
-
- /// <summary>
- /// Write bool.
- /// </summary>
- /// <param name="val">Bool value.</param>
- public void WriteBool(bool val)
- {
- WriteByte(val ? ByteOne : ByteZero);
- }
-
- /// <summary>
- /// Read bool.
- /// </summary>
- /// <returns>
- /// Bool value.
- /// </returns>
- public bool ReadBool()
- {
- return ReadByte() == ByteOne;
- }
-
- /// <summary>
- /// Write bool array.
- /// </summary>
- /// <param name="val">Bool array.</param>
- public abstract void WriteBoolArray(bool[] val);
-
- /// <summary>
- /// Internal routine to write bool array.
- /// </summary>
- /// <param name="val">Bool array.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteBoolArray0(bool[] val, byte* data)
- {
- fixed (bool* val0 = val)
- {
- CopyMemory((byte*)val0, data, val.Length);
- }
- }
-
- /// <summary>
- /// Read bool array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Bool array.
- /// </returns>
- public abstract bool[] ReadBoolArray(int cnt);
-
- /// <summary>
- /// Internal routine to read bool array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <returns>Bool array</returns>
- protected static bool[] ReadBoolArray0(int len, byte* data)
- {
- bool[] res = new bool[len];
-
- fixed (bool* res0 = res)
- {
- CopyMemory(data, (byte*)res0, len);
- }
-
- return res;
- }
-
- /// <summary>
- /// Write short.
- /// </summary>
- /// <param name="val">Short value.</param>
- public abstract void WriteShort(short val);
-
- /// <summary>
- /// Internal routine to write short value.
- /// </summary>
- /// <param name="val">Short value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteShort0(short val, byte* data)
- {
- if (LittleEndian)
- *((short*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[1];
- data[1] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read short.
- /// </summary>
- /// <returns>
- /// Short value.
- /// </returns>
- public abstract short ReadShort();
-
- /// <summary>
- /// Internal routine to read short value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Short value</returns>
- protected static short ReadShort0(byte* data)
- {
- short val;
-
- if (LittleEndian)
- val = *((short*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[1];
- valPtr[1] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write short array.
- /// </summary>
- /// <param name="val">Short array.</param>
- public abstract void WriteShortArray(short[] val);
-
- /// <summary>
- /// Internal routine to write short array.
- /// </summary>
- /// <param name="val">Short array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteShortArray0(short[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (short* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- short val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read short array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Short array.
- /// </returns>
- public abstract short[] ReadShortArray(int cnt);
-
- /// <summary>
- /// Internal routine to read short array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Short array</returns>
- protected static short[] ReadShortArray0(int len, byte* data, int cnt)
- {
- short[] res = new short[len];
-
- if (LittleEndian)
- {
- fixed (short* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- short val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write char.
- /// </summary>
- /// <param name="val">Char value.</param>
- public void WriteChar(char val)
- {
- WriteShort(*(short*)(&val));
- }
-
- /// <summary>
- /// Read char.
- /// </summary>
- /// <returns>
- /// Char value.
- /// </returns>
- public char ReadChar()
- {
- short val = ReadShort();
-
- return *(char*)(&val);
- }
-
- /// <summary>
- /// Write char array.
- /// </summary>
- /// <param name="val">Char array.</param>
- public abstract void WriteCharArray(char[] val);
-
- /// <summary>
- /// Internal routine to write char array.
- /// </summary>
- /// <param name="val">Char array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteCharArray0(char[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (char* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- char val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read char array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Char array.
- /// </returns>
- public abstract char[] ReadCharArray(int cnt);
-
- /// <summary>
- /// Internal routine to read char array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Char array</returns>
- protected static char[] ReadCharArray0(int len, byte* data, int cnt)
- {
- char[] res = new char[len];
-
- if (LittleEndian)
- {
- fixed (char* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- char val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write int.
- /// </summary>
- /// <param name="val">Int value.</param>
- public abstract void WriteInt(int val);
-
- /// <summary>
- /// Write int to specific position.
- /// </summary>
- /// <param name="writePos">Position.</param>
- /// <param name="val">Value.</param>
- public abstract void WriteInt(int writePos, int val);
-
- /// <summary>
- /// Internal routine to write int value.
- /// </summary>
- /// <param name="val">Int value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteInt0(int val, byte* data)
- {
- if (LittleEndian)
- *((int*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[3];
- data[1] = valPtr[2];
- data[2] = valPtr[1];
- data[3] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read int.
- /// </summary>
- /// <returns>
- /// Int value.
- /// </returns>
- public abstract int ReadInt();
-
- /// <summary>
- /// Internal routine to read int value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Int value</returns>
- protected static int ReadInt0(byte* data) {
- int val;
-
- if (LittleEndian)
- val = *((int*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[3];
- valPtr[1] = data[2];
- valPtr[2] = data[1];
- valPtr[3] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write int array.
- /// </summary>
- /// <param name="val">Int array.</param>
- public abstract void WriteIntArray(int[] val);
-
- /// <summary>
- /// Internal routine to write int array.
- /// </summary>
- /// <param name="val">Int array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteIntArray0(int[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (int* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- int val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read int array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Int array.
- /// </returns>
- public abstract int[] ReadIntArray(int cnt);
-
- /// <summary>
- /// Internal routine to read int array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Int array</returns>
- protected static int[] ReadIntArray0(int len, byte* data, int cnt)
- {
- int[] res = new int[len];
-
- if (LittleEndian)
- {
- fixed (int* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- int val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write float.
- /// </summary>
- /// <param name="val">Float value.</param>
- public void WriteFloat(float val)
- {
- int val0 = *(int*)(&val);
-
- WriteInt(val0);
- }
-
- /// <summary>
- /// Read float.
- /// </summary>
- /// <returns>
- /// Float value.
- /// </returns>
- public float ReadFloat()
- {
- int val = ReadInt();
-
- return BinaryUtils.IntToFloatBits(val);
- }
-
- /// <summary>
- /// Write float array.
- /// </summary>
- /// <param name="val">Float array.</param>
- public abstract void WriteFloatArray(float[] val);
-
- /// <summary>
- /// Internal routine to write float array.
- /// </summary>
- /// <param name="val">Int array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteFloatArray0(float[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (float* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- float val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read float array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Float array.
- /// </returns>
- public abstract float[] ReadFloatArray(int cnt);
-
- /// <summary>
- /// Internal routine to read float array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Float array</returns>
- protected static float[] ReadFloatArray0(int len, byte* data, int cnt)
- {
- float[] res = new float[len];
-
- if (LittleEndian)
- {
- fixed (float* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- int val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write long.
- /// </summary>
- /// <param name="val">Long value.</param>
- public abstract void WriteLong(long val);
-
- /// <summary>
- /// Internal routine to write long value.
- /// </summary>
- /// <param name="val">Long value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteLong0(long val, byte* data)
- {
- if (LittleEndian)
- *((long*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[7];
- data[1] = valPtr[6];
- data[2] = valPtr[5];
- data[3] = valPtr[4];
- data[4] = valPtr[3];
- data[5] = valPtr[2];
- data[6] = valPtr[1];
- data[7] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read long.
- /// </summary>
- /// <returns>
- /// Long value.
- /// </returns>
- public abstract long ReadLong();
-
- /// <summary>
- /// Internal routine to read long value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Long value</returns>
- protected static long ReadLong0(byte* data)
- {
- long val;
-
- if (LittleEndian)
- val = *((long*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[7];
- valPtr[1] = data[6];
- valPtr[2] = data[5];
- valPtr[3] = data[4];
- valPtr[4] = data[3];
- valPtr[5] = data[2];
- valPtr[6] = data[1];
- valPtr[7] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write long array.
- /// </summary>
- /// <param name="val">Long array.</param>
- public abstract void WriteLongArray(long[] val);
-
- /// <summary>
- /// Internal routine to write long array.
- /// </summary>
- /// <param name="val">Long array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteLongArray0(long[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (long* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- long val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[7];
- *curPos++ = valPtr[6];
- *curPos++ = valPtr[5];
- *curPos++ = valPtr[4];
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read long array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Long array.
- /// </returns>
- public abstract long[] ReadLongArray(int cnt);
-
- /// <summary>
- /// Internal routine to read long array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Long array</returns>
- protected static long[] ReadLongArray0(int len, byte* data, int cnt)
- {
- long[] res = new long[len];
-
- if (LittleEndian)
- {
- fixed (long* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- long val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[7] = *data++;
- valPtr[6] = *data++;
- valPtr[5] = *data++;
- valPtr[4] = *data++;
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write double.
- /// </summary>
- /// <param name="val">Double value.</param>
- public void WriteDouble(double val)
- {
- long val0 = *(long*)(&val);
-
- WriteLong(val0);
- }
-
- /// <summary>
- /// Read double.
- /// </summary>
- /// <returns>
- /// Double value.
- /// </returns>
- public double ReadDouble()
- {
- long val = ReadLong();
-
- return BinaryUtils.LongToDoubleBits(val);
- }
-
- /// <summary>
- /// Write double array.
- /// </summary>
- /// <param name="val">Double array.</param>
- public abstract void WriteDoubleArray(double[] val);
-
- /// <summary>
- /// Internal routine to write double array.
- /// </summary>
- /// <param name="val">Double array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteDoubleArray0(double[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (double* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- double val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[7];
- *curPos++ = valPtr[6];
- *curPos++ = valPtr[5];
- *curPos++ = valPtr[4];
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read double array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Double array.
- /// </returns>
- public abstract double[] ReadDoubleArray(int cnt);
-
- /// <summary>
- /// Internal routine to read double array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Double array</returns>
- protected static double[] ReadDoubleArray0(int len, byte* data, int cnt)
- {
- double[] res = new double[len];
-
- if (LittleEndian)
- {
- fixed (double* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- double val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[7] = *data++;
- valPtr[6] = *data++;
- valPtr[5] = *data++;
- valPtr[4] = *data++;
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write string.
- /// </summary>
- /// <param name="chars">Characters.</param>
- /// <param name="charCnt">Char count.</param>
- /// <param name="byteCnt">Byte count.</param>
- /// <param name="encoding">Encoding.</param>
- /// <returns>
- /// Amounts of bytes written.
- /// </returns>
- public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding);
-
- /// <summary>
- /// Write arbitrary data.
- /// </summary>
- /// <param name="src">Source array.</param>
- /// <param name="off">Offset</param>
- /// <param name="cnt">Count.</param>
- public void Write(byte[] src, int off, int cnt)
- {
- fixed (byte* src0 = src)
- {
- Write(src0 + off, cnt);
- }
- }
-
- /// <summary>
- /// Read arbitrary data.
- /// </summary>
- /// <param name="dest">Destination array.</param>
- /// <param name="off">Offset.</param>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Amount of bytes read.
- /// </returns>
- public void Read(byte[] dest, int off, int cnt)
- {
- fixed (byte* dest0 = dest)
- {
- Read(dest0 + off, cnt);
- }
- }
-
- /// <summary>
- /// Write arbitrary data.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="cnt">Count.</param>
- public abstract void Write(byte* src, int cnt);
-
- /// <summary>
- /// Internal write routine.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="cnt">Count.</param>
- /// <param name="data">Data (dsetination).</param>
- protected void WriteInternal(byte* src, int cnt, byte* data)
- {
- CopyMemory(src, data + Pos, cnt);
- }
-
- /// <summary>
- /// Read arbitrary data.
- /// </summary>
- /// <param name="dest">Destination.</param>
- /// <param name="cnt">Count.</param>
- /// <returns></returns>
- public abstract void Read(byte* dest, int cnt);
-
- /// <summary>
- /// Internal read routine.
- /// </summary>
- /// <param name="src">Source</param>
- /// <param name="dest">Destination.</param>
- /// <param name="cnt">Count.</param>
- /// <returns>Amount of bytes written.</returns>
- protected void ReadInternal(byte* src, byte* dest, int cnt)
- {
- int cnt0 = Math.Min(Remaining, cnt);
-
- CopyMemory(src + Pos, dest, cnt0);
-
- ShiftRead(cnt0);
- }
-
- /// <summary>
- /// Position.
- /// </summary>
- public int Position
- {
- get { return Pos; }
- }
-
- /// <summary>
- /// Gets remaining bytes in the stream.
- /// </summary>
- /// <value>
- /// Remaining bytes.
- /// </value>
- public abstract int Remaining { get; }
-
- /// <summary>
- /// Gets underlying array, avoiding copying if possible.
- /// </summary>
- /// <returns>
- /// Underlying array.
- /// </returns>
- public abstract byte[] GetArray();
-
- /// <summary>
- /// Gets underlying data in a new array.
- /// </summary>
- /// <returns>
- /// New array with data.
- /// </returns>
- public abstract byte[] GetArrayCopy();
-
- /// <summary>
- /// Check whether array passed as argument is the same as the stream hosts.
- /// </summary>
- /// <param name="arr">Array.</param>
- /// <returns>
- /// <c>True</c> if they are same.
- /// </returns>
- public abstract bool IsSameArray(byte[] arr);
-
- /// <summary>
- /// Seek to the given position.
- /// </summary>
- /// <param name="offset">Offset.</param>
- /// <param name="origin">Seek origin.</param>
- /// <returns>
- /// Position.
- /// </returns>
- /// <exception cref="System.ArgumentException">
- /// Unsupported seek origin: + origin
- /// or
- /// Seek before origin: + newPos
- /// </exception>
- public int Seek(int offset, SeekOrigin origin)
- {
- int newPos;
-
- switch (origin)
- {
- case SeekOrigin.Begin:
- {
- newPos = offset;
-
- break;
- }
-
- case SeekOrigin.Current:
- {
- newPos = Pos + offset;
-
- break;
- }
-
- default:
- throw new ArgumentException("Unsupported seek origin: " + origin);
- }
-
- if (newPos < 0)
- throw new ArgumentException("Seek before origin: " + newPos);
-
- EnsureWriteCapacity(newPos);
-
- Pos = newPos;
-
- return Pos;
- }
-
- /// <summary>
- /// Returns a hash code for the specified byte range.
- /// </summary>
- public abstract T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg);
-
- /// <summary>
- /// Flushes the data to underlying storage.
- /// </summary>
- public void Flush()
- {
- // No-op.
- }
-
- /** <inheritdoc /> */
- public void Dispose()
- {
- if (_disposed)
- return;
-
- Dispose(true);
-
- GC.SuppressFinalize(this);
-
- _disposed = true;
- }
-
- /// <summary>
- /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
- /// </summary>
- protected abstract void Dispose(bool disposing);
-
- /// <summary>
- /// Ensure capacity for write.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected abstract void EnsureWriteCapacity(int cnt);
-
- /// <summary>
- /// Ensure capacity for write and shift position.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Position before shift.</returns>
- protected int EnsureWriteCapacityAndShift(int cnt)
- {
- int pos0 = Pos;
-
- EnsureWriteCapacity(Pos + cnt);
-
- ShiftWrite(cnt);
-
- return pos0;
- }
-
- /// <summary>
- /// Ensure capacity for read.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected abstract void EnsureReadCapacity(int cnt);
-
- /// <summary>
- /// Ensure capacity for read and shift position.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Position before shift.</returns>
- protected int EnsureReadCapacityAndShift(int cnt)
- {
- int pos0 = Pos;
-
- EnsureReadCapacity(cnt);
-
- ShiftRead(cnt);
-
- return pos0;
- }
-
- /// <summary>
- /// Shift position due to write
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected void ShiftWrite(int cnt)
- {
- Pos += cnt;
- }
-
- /// <summary>
- /// Shift position due to read.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- private void ShiftRead(int cnt)
- {
- Pos += cnt;
- }
-
- /// <summary>
- /// Calculate new capacity.
- /// </summary>
- /// <param name="curCap">Current capacity.</param>
- /// <param name="reqCap">Required capacity.</param>
- /// <returns>New capacity.</returns>
- protected static int Capacity(int curCap, int reqCap)
- {
- int newCap;
-
- if (reqCap < 256)
- newCap = 256;
- else
- {
- newCap = curCap << 1;
-
- if (newCap < reqCap)
- newCap = reqCap;
- }
-
- return newCap;
- }
-
- /// <summary>
- /// Unsafe memory copy routine.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="dest">Destination.</param>
- /// <param name="len">Length.</param>
- private static void CopyMemory(byte* src, byte* dest, int len)
- {
- PlatformMemoryUtils.CopyMemory(src, dest, len);
- }
- }
-}
[5/6] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3478
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23742962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23742962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23742962
Branch: refs/heads/ignite-3478
Commit: 23742962f8d539aac33a7ac953f09a1407b330e9
Parents: 970cf47 b8b7c50
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:36:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:36:52 2017 +0300
----------------------------------------------------------------------
.../internal/MarshallerMappingFileStore.java | 15 +-
.../service/GridServiceProcessor.java | 11 +-
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 +++---
.../IgniteMarshallerCacheFSRestoreTest.java | 71 +-
.../Apache.Ignite.Core.csproj | 1 -
.../Impl/Binary/Io/BinaryHeapStream.cs | 1018 +++++++++++++-
.../Impl/Binary/Io/BinaryStreamBase.cs | 1249 ------------------
8 files changed, 1237 insertions(+), 1508 deletions(-)
----------------------------------------------------------------------