You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 11:24:15 UTC
[01/10] ignite git commit: IGNITE-6545: Failure during Ignite
Service.cancel() can break normal shutdown process. This closes #2807.
Repository: ignite
Updated Branches:
refs/heads/ignite-5932 178006226 -> b73792aec
IGNITE-6545: Failure during Ignite Service.cancel() can break normal shutdown process. This closes #2807.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ffa1099
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ffa1099
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ffa1099
Branch: refs/heads/ignite-5932
Commit: 8ffa1099e2afd14052f7c91be822b2aa3f5f2a8d
Parents: 0f3546a
Author: AMRepo <an...@gmail.com>
Authored: Tue Oct 10 11:57:20 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Oct 11 12:00:26 2017 +0300
----------------------------------------------------------------------
.../processors/service/GridServiceProcessor.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ffa1099/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 9272760..6f1dfc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -316,7 +316,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
Service svc = ctx.service();
if (svc != null)
- svc.cancel(ctx);
+ try {
+ svc.cancel(ctx);
+ }
+ catch (Throwable e) {
+ log.error("Failed to cancel service (ignoring) [name=" + ctx.name() +
+ ", execId=" + ctx.executionId() + ']', e);
+
+ if (e instanceof Error)
+ throw e;
+ }
ctx.executor().shutdownNow();
}
[06/10] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3478
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23742962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23742962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23742962
Branch: refs/heads/ignite-5932
Commit: 23742962f8d539aac33a7ac953f09a1407b330e9
Parents: 970cf47 b8b7c50
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:36:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:36:52 2017 +0300
----------------------------------------------------------------------
.../internal/MarshallerMappingFileStore.java | 15 +-
.../service/GridServiceProcessor.java | 11 +-
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 +++---
.../IgniteMarshallerCacheFSRestoreTest.java | 71 +-
.../Apache.Ignite.Core.csproj | 1 -
.../Impl/Binary/Io/BinaryHeapStream.cs | 1018 +++++++++++++-
.../Impl/Binary/Io/BinaryStreamBase.cs | 1249 ------------------
8 files changed, 1237 insertions(+), 1508 deletions(-)
----------------------------------------------------------------------
[03/10] ignite git commit: IGNITE-6536 Node fails when detects
mapping storage corruption
Posted by sb...@apache.org.
IGNITE-6536 Node fails when detects mapping storage corruption
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5490c7d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5490c7d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5490c7d9
Branch: refs/heads/ignite-5932
Commit: 5490c7d927daca2d64f108b6c2eafe03bbd6f54e
Parents: b0158fb
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Oct 11 15:33:23 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Oct 11 15:34:06 2017 +0300
----------------------------------------------------------------------
.../internal/MarshallerMappingFileStore.java | 15 +++--
.../IgniteMarshallerCacheFSRestoreTest.java | 71 +++++++++++++++++++-
2 files changed, 77 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index eabbdb8..59a99b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -167,16 +167,19 @@ final class MarshallerMappingFileStore {
try (FileInputStream in = new FileInputStream(file)) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
- String className = reader.readLine();
+ String clsName = reader.readLine();
- marshCtx.registerClassNameLocally(platformId, typeId, className);
+ if (clsName == null) {
+ throw new IgniteCheckedException("Class name is null for [platformId=" + platformId +
+ ", typeId=" + typeId + "], marshaller mappings storage is broken. " +
+ "Clean up marshaller directory (<work_dir>/marshaller) and restart the node.");
+ }
+
+ marshCtx.registerClassNameLocally(platformId, typeId, clsName);
}
}
catch (IOException e) {
- throw new IgniteCheckedException("Reading marshaller mapping from file "
- + name
- + " failed."
- , e);
+ throw new IgniteCheckedException("Reading marshaller mapping from file " + name + " failed.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5490c7d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 21a3e43..ac15971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -23,13 +23,16 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
@@ -47,6 +50,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
/** */
private volatile boolean isDuplicateObserved = true;
+ /** */
+ private boolean isPersistenceEnabled;
+
/**
*
*/
@@ -67,6 +73,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
}
}
+ /** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -75,13 +82,17 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(discoSpi);
- CacheConfiguration singleCacheConfig = new CacheConfiguration()
+ CacheConfiguration singleCacheCfg = new CacheConfiguration()
.setName(DEFAULT_CACHE_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1)
.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- cfg.setCacheConfiguration(singleCacheConfig);
+ cfg.setCacheConfiguration(singleCacheCfg);
+
+ //persistence must be enabled to verify restoring mappings from FS case
+ if (isPersistenceEnabled)
+ cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
return cfg;
}
@@ -110,11 +121,14 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
* In that case the request must not be marked as duplicate and must be processed in a regular way.
* No hangs must take place.
*
- * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> Take a look at JIRA ticket for more information about context of this test.
+ * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> JIRA ticket
+ * provides more information about context of this test.
*
* This test must never hang on proposing of MarshallerMapping.
*/
public void testFileMappingReadAndPropose() throws Exception {
+ isPersistenceEnabled = false;
+
prepareMarshallerFileStore();
IgniteEx ignite0 = startGrid(0);
@@ -162,6 +176,57 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest {
}
}
+ /**
+ * Verifies scenario that node with corrupted marshaller mapping store must fail on startup
+ * with appropriate error message.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/IGNITE-6536">IGNITE-6536</a> JIRA provides more information
+ * about this case.
+ */
+ public void testNodeStartFailsOnCorruptedStorage() throws Exception {
+ isPersistenceEnabled = true;
+
+ Ignite ig = startGrids(3);
+
+ ig.active(true);
+
+ ig.cache(DEFAULT_CACHE_NAME).put(0, new SimpleValue(0, "value0"));
+
+ stopAllGrids();
+
+ corruptMarshallerStorage();
+
+ try {
+ startGrid(0);
+ }
+ catch (IgniteCheckedException e) {
+ verifyException((IgniteCheckedException) e.getCause());
+ }
+ }
+
+ /**
+ * Class name for CustomClass class mapping file gets cleaned up from file system.
+ */
+ private void corruptMarshallerStorage() throws Exception {
+ String marshallerDir = U.defaultWorkDirectory() + File.separator + "marshaller";
+
+ File[] storedMappingsFiles = new File(marshallerDir).listFiles();
+
+ assert storedMappingsFiles.length == 1;
+
+ try (FileOutputStream out = new FileOutputStream(storedMappingsFiles[0])) {
+ out.getChannel().truncate(0);
+ }
+ }
+
+ /** */
+ private void verifyException(IgniteCheckedException e) throws Exception {
+ String msg = e.getMessage();
+
+ if (msg == null || !msg.contains("Class name is null"))
+ throw new Exception("Exception with unexpected message was thrown: " + msg, e);
+ }
+
/** */
private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
[02/10] ignite git commit: IGNITE-6542 Reliably close SocketChannel
in TcpCommunicationSpi.
Posted by sb...@apache.org.
IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.
Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2
Branch: refs/heads/ignite-5932
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300
----------------------------------------------------------------------
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 ++++++++++---------
2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
/** {@inheritDoc} */
@Override public void forceClose() {
- closed.set(false);
+ closed.set(true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
try {
- safeHandshake(client,
- null,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- null,
- null);
+ safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
- boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
int lastWaitingTimeout = 1;
- while (!conn) { // Reconnection on handshake timeout.
+ while (client == null) { // Reconnection on handshake timeout.
+ boolean needWait = false;
+
try {
SocketChannel ch = SocketChannel.open();
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return null;
}
- Long rcvCnt = null;
+ Long rcvCnt;
Map<Integer, Object> meta = new HashMap<>();
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
Integer handshakeConnIdx = connIdx;
- rcvCnt = safeHandshake(ch,
+ rcvCnt = safeTcpHandshake(ch,
recoveryDesc,
node.id(),
timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
handshakeConnIdx);
if (rcvCnt == ALREADY_CONNECTED) {
- recoveryDesc.release();
-
return null;
}
else if (rcvCnt == NODE_STOPPING) {
- recoveryDesc.release();
-
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
}
else if (rcvCnt == NEED_WAIT) {
- recoveryDesc.release();
-
- U.closeQuiet(ch);
-
- if (lastWaitingTimeout < 60000)
- lastWaitingTimeout *= 2;
-
- U.sleep(lastWaitingTimeout);
+ needWait = true;
continue;
}
- }
- finally {
- if (recoveryDesc != null && rcvCnt == null)
- recoveryDesc.release();
- }
- try {
meta.put(CONN_IDX_META, connKey);
if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
- conn = true;
}
finally {
- if (!conn) {
+ if (client == null) {
+ U.closeQuiet(ch);
+
if (recoveryDesc != null)
recoveryDesc.release();
+
+ if (needWait) {
+ if (lastWaitingTimeout < 60000)
+ lastWaitingTimeout *= 2;
+
+ U.sleep(lastWaitingTimeout);
+ }
}
}
}
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- if (conn)
+ if (client != null)
break;
}
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* Performs handshake in timeout-safe way.
*
* @param client Client.
+ * @param rmtNodeId Remote node.
+ * @param timeout Timeout for handshake.
+ * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private void safeShmemHandshake(
+ GridCommunicationClient client,
+ UUID rmtNodeId,
+ long timeout
+ ) throws IgniteCheckedException {
+ HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+ U.currentTimeMillis() + timeout);
+
+ addTimeoutObject(obj);
+
+ try {
+ client.doHandshake(new HandshakeClosure(rmtNodeId));
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ removeTimeoutObject(obj);
+
+ // Ignoring whatever happened after timeout - reporting only timeout event.
+ if (!cancelled)
+ throw new HandshakeTimeoutException(
+ new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property)."));
+ }
+ }
+
+ /**
+ * Performs handshake in timeout-safe way.
+ *
+ * @param ch Socket channel.
* @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- private <T> long safeHandshake(
- T client,
+ private long safeTcpHandshake(
+ SocketChannel ch,
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
GridSslMeta sslMeta,
@Nullable Integer handshakeConnIdx
) throws IgniteCheckedException {
- HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+ HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
long rcvCnt = 0;
try {
- if (client instanceof GridCommunicationClient)
- ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
- else {
- SocketChannel ch = (SocketChannel)client;
+ BlockingSslHandler sslHnd = null;
- boolean success = false;
+ ByteBuffer buf;
- try {
- BlockingSslHandler sslHnd = null;
+ if (isSslEnabled()) {
+ assert sslMeta != null;
- ByteBuffer buf;
+ sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
- if (isSslEnabled()) {
- assert sslMeta != null;
+ if (!sslHnd.handshake())
+ throw new HandshakeException("SSL handshake is not completed.");
- sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+ ByteBuffer handBuff = sslHnd.applicationBuffer();
- if (!sslHnd.handshake())
- throw new HandshakeException("SSL handshake is not completed.");
+ if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+ buf = ByteBuffer.allocate(1000);
- ByteBuffer handBuff = sslHnd.applicationBuffer();
+ int read = ch.read(buf);
- if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
- buf = ByteBuffer.allocate(1000);
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- int read = ch.read(buf);
+ buf.flip();
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
- buf.flip();
-
- buf = sslHnd.decode(buf);
- }
- else
- buf = handBuff;
- }
- else {
- buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+ buf = sslHnd.decode(buf);
+ }
+ else
+ buf = handBuff;
+ }
+ else {
+ buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
- for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- i += read;
- }
- }
+ i += read;
+ }
+ }
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+ UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
- if (!rmtNodeId.equals(rmtNodeId0))
- throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
- ", rcvd=" + rmtNodeId0 + ']');
- else if (log.isDebugEnabled())
- log.debug("Received remote node ID: " + rmtNodeId0);
+ if (!rmtNodeId.equals(rmtNodeId0))
+ throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+ ", rcvd=" + rmtNodeId0 + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Received remote node ID: " + rmtNodeId0);
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
- }
- else
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- ClusterNode locNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
- if (recovery != null) {
- HandshakeMessage msg;
+ if (recovery != null) {
+ HandshakeMessage msg;
- int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+ int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
+ if (handshakeConnIdx != null) {
+ msg = new HandshakeMessage2(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received(),
+ handshakeConnIdx);
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
+ msgSize += 4;
+ }
+ else {
+ msg = new HandshakeMessage(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received());
+ }
- if (log.isDebugEnabled())
- log.debug("Writing handshake message [locNodeId=" + locNode.id() +
- ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+ ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(msgSize);
+ buf = ByteBuffer.allocate(msgSize);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- boolean written = msg.writeTo(buf, null);
+ boolean written = msg.writeTo(buf, null);
- assert written;
+ assert written;
- buf.flip();
+ buf.flip();
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(buf));
- }
- else
- ch.write(buf);
- }
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ ch.write(sslHnd.encrypt(buf));
+ }
+ else
+ ch.write(buf);
+ }
+ else {
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
- }
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- }
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+ }
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ if (recovery != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- buf = ByteBuffer.allocate(1000);
- buf.order(ByteOrder.nativeOrder());
+ buf = ByteBuffer.allocate(1000);
+ buf.order(ByteOrder.nativeOrder());
- ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
- decode.order(ByteOrder.nativeOrder());
+ ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+ decode.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- buf.flip();
+ buf.flip();
- ByteBuffer decode0 = sslHnd.decode(buf);
+ ByteBuffer decode0 = sslHnd.decode(buf);
- i += decode0.remaining();
+ i += decode0.remaining();
- decode = appendAndResizeIfNeeded(decode, decode0);
+ decode = appendAndResizeIfNeeded(decode, decode0);
- buf.clear();
- }
+ buf.clear();
+ }
- decode.flip();
+ decode.flip();
- rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+ rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
- if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
- decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+ decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- sslMeta.decodedBuffer(decode);
- }
+ sslMeta.decodedBuffer(decode);
+ }
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ ByteBuffer inBuf = sslHnd.inputBuffer();
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (inBuf.position() > 0)
+ sslMeta.encodedBuffer(inBuf);
+ }
+ else {
+ buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- i += read;
- }
+ i += read;
+ }
- rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
- }
+ rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+ }
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
- else
- success = true;
- }
- else
- success = true;
- }
- catch (IOException e) {
+ if (rcvCnt == -1) {
if (log.isDebugEnabled())
- log.debug("Failed to read from channel: " + e);
-
- throw new IgniteCheckedException("Failed to read from channel.", e);
- }
- finally {
- if (!success)
- U.closeQuiet(ch);
+ log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
}
}
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to read from channel: " + e);
+
+ throw new IgniteCheckedException("Failed to read from channel.", e);
+ }
finally {
boolean cancelled = obj.cancel();
[10/10] ignite git commit: ignite-5932
Posted by sb...@apache.org.
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b73792ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b73792ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b73792ae
Branch: refs/heads/ignite-5932
Commit: b73792aece2cd05a294d46d6befe0496f7ab1772
Parents: 031928e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:52:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 14:24:00 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../near/GridNearTxFinishAndAckFuture.java | 4 +-
.../near/GridNearTxFinishFuture.java | 25 +++++++---
.../cache/distributed/near/GridNearTxLocal.java | 2 -
.../cache/mvcc/CacheCoordinatorsProcessor.java | 50 ++++++++++++--------
.../processors/cache/mvcc/MvccQueryTracker.java | 24 ++++++----
.../processors/cache/mvcc/TxMvccInfo.java | 12 +++--
.../cache/mvcc/CacheMvccTransactionsTest.java | 34 ++++++++++++-
8 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d624e2c..cb2eaa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -300,7 +300,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
assert mvccInfo != null;
- IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+ IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(), waitTxs);
add(fut);
@@ -412,7 +412,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (tx.onePhaseCommit())
return false;
- assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null;
+ assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null || F.isEmpty(tx.writeEntries());
boolean sync = tx.syncMode() == FULL_SYNC;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index 5d8b77c..36efe2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -61,9 +61,9 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
TxMvccInfo mvccInfo = tx.mvccInfo();
if (qryTracker != null)
- ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+ ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true);
else if (mvccInfo != null) {
- ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+ ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(),
mvccInfo.version(),
null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9b60d7..1116c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -403,6 +404,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
fut.getClass() == CheckRemoteTxMiniFuture.class;
}
+ /**
+ *
+ */
+ private void ackMvccCoordinatorOnRollback() {
+ TxMvccInfo mvccInfo = tx.mvccInfo();
+
+ MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
+ if (qryTracker != null)
+ qryTracker.onTxDone(mvccInfo, cctx, false);
+ else if (mvccInfo != null)
+ cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("ForLoopReplaceableByForEach")
public void finish(boolean commit, boolean clearThreadMap) {
@@ -421,11 +436,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
return;
}
- if (!commit && tx.mvccInfo() != null) {
- TxMvccInfo mvccInfo = tx.mvccInfo();
-
- cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version());
- }
+ if (!commit)
+ ackMvccCoordinatorOnRollback();
try {
if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
@@ -436,7 +448,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
assert mvccInfo != null;
- IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+ IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
+ waitTxs);
add(fut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c774f93..51d842c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3373,8 +3373,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false)))
return chainFinishFuture(finishFut, false);
- cctx.mvcc().addFuture(fut0, fut0.futureId());
-
IgniteInternalFuture<?> prepFut = this.prepFut;
if (prepFut == null || prepFut.isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index a9a5eba..a5a9b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -454,47 +454,57 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
ackFuts.put(fut.id, fut);
- MvccCoordinatorMessage msg;
+ CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer);
+
+ try {
+ ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
+ }
+ catch (IgniteCheckedException e) {
+ if (ackFuts.remove(fut.id) != null) {
+ if (e instanceof ClusterTopologyCheckedException)
+ fut.onDone(); // No need to ack, finish without error.
+ else
+ fut.onDone(e);
+ }
+ }
+
+ return fut;
+ }
+
+ private CoordinatorAckRequestTx createTxAckMessage(long futId,
+ MvccCoordinatorVersion updateVer,
+ @Nullable MvccCoordinatorVersion readVer)
+ {
+ CoordinatorAckRequestTx msg;
if (readVer != null) {
long trackCntr = queryTrackCounter(readVer);
if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
- msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+ msg = new CoordinatorAckRequestTxAndQuery(futId,
updateVer.counter(),
trackCntr);
}
else {
- msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+ msg = new CoordinatorAckRequestTxAndQueryEx(futId,
updateVer.counter(),
readVer.coordinatorVersion(),
trackCntr);
}
}
else
- msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
-
- try {
- ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
- }
- catch (IgniteCheckedException e) {
- if (ackFuts.remove(fut.id) != null) {
- if (e instanceof ClusterTopologyCheckedException)
- fut.onDone(); // No need to ack, finish without error.
- else
- fut.onDone(e);
- }
- }
+ msg = new CoordinatorAckRequestTx(futId, updateVer.counter());
- return fut;
+ return msg;
}
/**
* @param crdId Coordinator node ID.
- * @param mvccVer Transaction version.
+ * @param updateVer Transaction update version.
+ * @param readVer Transaction read version.
*/
- public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
- CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
+ public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion readVer) {
+ CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer);
msg.skipResponse(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index e45b77c..0e3eb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -116,7 +116,13 @@ public class MvccQueryTracker {
cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
}
- public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+ /**
+ * @param mvccInfo Mvcc update info.
+ * @param ctx Context.
+ * @param commit If {@code true} ack commit, otherwise rollback.
+ * @return Commit ack future.
+ */
+ public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx, boolean commit) {
MvccCoordinator mvccCrd0 = null;
MvccCoordinatorVersion mvccVer0 = null;
@@ -131,24 +137,22 @@ public class MvccQueryTracker {
}
}
- if (mvccVer0 != null) {
+ assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId());
+
+ if (mvccVer0 != null || mvccInfo != null) {
if (mvccInfo == null) {
cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
return null;
}
- else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
- return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
else {
- cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
-
- return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+ if (commit)
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+ else
+ ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
}
}
- if (mvccInfo != null)
- return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
-
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
index 428d707..2306110 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -42,8 +42,8 @@ public class TxMvccInfo implements Message {
}
/**
- * @param crd
- * @param mvccVer
+ * @param crd Coordinator node ID.
+ * @param mvccVer Mvcc version.
*/
public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
assert crd != null;
@@ -53,10 +53,16 @@ public class TxMvccInfo implements Message {
this.mvccVer = mvccVer;
}
- public UUID coordinator() {
+ /**
+ * @return Coordinator node ID.
+ */
+ public UUID coordinatorNodeId() {
return crd;
}
+ /**
+ * @return Mvcc version.
+ */
public MvccCoordinatorVersion version() {
return mvccVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b73792ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 8964cd4..70b910b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -336,6 +336,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testActiveQueriesCleanup() throws Exception {
+ activeQueriesCleanup(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testActiveQueriesCleanupTx() throws Exception {
+ activeQueriesCleanup(true);
+ }
+
+ /**
+ * @param tx If {@code true} tests reads inside transaction.
+ * @throws Exception If failed.
+ */
+ private void activeQueriesCleanup(final boolean tx) throws Exception {
startGridsMultiThreaded(SRVS);
client = true;
@@ -354,7 +369,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
@Override public void apply(Integer idx) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
- IgniteCache cache = ignite(idx % NODES).cache(DEFAULT_CACHE_NAME);
+ Ignite node = ignite(idx % NODES);
+
+ IgniteTransactions txs = node.transactions();
+
+ IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
while (System.currentTimeMillis() < stopTime) {
int keyCnt = rnd.nextInt(10) + 1;
@@ -364,7 +383,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (int i = 0; i < keyCnt; i++)
keys.add(rnd.nextInt());
- cache.getAll(keys);
+ if (tx) {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.getAll(keys);
+
+ if (rnd.nextBoolean())
+ tx.commit();
+ else
+ tx.rollback();
+ }
+ }
+ else
+ cache.getAll(keys);
}
}
}, NODES * 2, "get-thread");
[09/10] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-3478' into ignite-5932
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-3478' into ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/031928e9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/031928e9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/031928e9
Branch: refs/heads/ignite-5932
Commit: 031928e9f1ab1469370f500e57a4b88c0eb8580a
Parents: d6670e8 f29d4bc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:45:19 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:45:19 2017 +0300
----------------------------------------------------------------------
.../internal/MarshallerMappingFileStore.java | 15 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 12 +-
.../service/GridServiceProcessor.java | 11 +-
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 +++---
.../IgniteMarshallerCacheFSRestoreTest.java | 71 +-
.../Apache.Ignite.Core.csproj | 1 -
.../Impl/Binary/Io/BinaryHeapStream.cs | 1018 +++++++++++++-
.../Impl/Binary/Io/BinaryStreamBase.cs | 1249 ------------------
9 files changed, 1247 insertions(+), 1510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/031928e9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
[05/10] ignite git commit: IGNITE-5928 .NET: Get rid of
BinaryStreamBase
Posted by sb...@apache.org.
IGNITE-5928 .NET: Get rid of BinaryStreamBase
This closes #2835
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b7c508
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b7c508
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b7c508
Branch: refs/heads/ignite-5932
Commit: b8b7c50865d37449616cc6dadeeaba84526945f4
Parents: 5490c7d
Author: Alexey Popov <ta...@gmail.com>
Authored: Thu Oct 12 12:44:00 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Oct 12 12:44:00 2017 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.csproj | 1 -
.../Impl/Binary/Io/BinaryHeapStream.cs | 1018 +++++++++++++-
.../Impl/Binary/Io/BinaryStreamBase.cs | 1249 ------------------
3 files changed, 958 insertions(+), 1310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 58abd26..446208a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -431,7 +431,6 @@
<Compile Include="Impl\Messaging\Messaging.cs" />
<Compile Include="Impl\NativeMethods.cs" />
<Compile Include="Impl\Binary\IO\IBinaryStream.cs" />
- <Compile Include="Impl\Binary\IO\BinaryStreamBase.cs" />
<Compile Include="Impl\Binary\IO\BinaryHeapStream.cs" />
<Compile Include="Impl\Binary\IBinaryTypeDescriptor.cs" />
<Compile Include="Impl\Binary\IBinaryWriteAware.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
index a14da0a..a6082f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryHeapStream.cs
@@ -22,12 +22,28 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
+ using Apache.Ignite.Core.Impl.Memory;
/// <summary>
/// Binary onheap stream.
/// </summary>
- internal unsafe class BinaryHeapStream : BinaryStreamBase
+ internal unsafe class BinaryHeapStream : IBinaryStream
{
+ /** Byte: zero. */
+ private const byte ByteZero = 0;
+
+ /** Byte: one. */
+ private const byte ByteOne = 1;
+
+ /** LITTLE_ENDIAN flag. */
+ private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
+
+ /** Position. */
+ private int _pos;
+
+ /** Disposed flag. */
+ private bool _disposed;
+
/** Data array. */
private byte[] _data;
@@ -53,8 +69,903 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
_data = data;
}
+ /// <summary>
+ /// Internal routine to write byte array.
+ /// </summary>
+ /// <param name="val">Byte array.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteByteArray0(byte[] val, byte* data)
+ {
+ fixed (byte* val0 = val)
+ {
+ CopyMemory(val0, data, val.Length);
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read byte array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Byte array</returns>
+ private static byte[] ReadByteArray0(int len, byte* data)
+ {
+ byte[] res = new byte[len];
+
+ fixed (byte* res0 = res)
+ {
+ CopyMemory(data, res0, len);
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteBool(bool val)
+ {
+ WriteByte(val ? ByteOne : ByteZero);
+ }
+
+ /** <inheritdoc /> */
+ public bool ReadBool()
+ {
+ return ReadByte() == ByteOne;
+ }
+
+ /// <summary>
+ /// Internal routine to write bool array.
+ /// </summary>
+ /// <param name="val">Bool array.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteBoolArray0(bool[] val, byte* data)
+ {
+ fixed (bool* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, val.Length);
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read bool array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Bool array</returns>
+ private static bool[] ReadBoolArray0(int len, byte* data)
+ {
+ bool[] res = new bool[len];
+
+ fixed (bool* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, len);
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write short value.
+ /// </summary>
+ /// <param name="val">Short value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteShort0(short val, byte* data)
+ {
+ if (LittleEndian)
+ *((short*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[1];
+ data[1] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read short value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Short value</returns>
+ private static short ReadShort0(byte* data)
+ {
+ short val;
+
+ if (LittleEndian)
+ val = *((short*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[1];
+ valPtr[1] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write short array.
+ /// </summary>
+ /// <param name="val">Short array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteShortArray0(short[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (short* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ short val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read short array.
+ /// </summary>
+ /// <param name="len">Array length.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Short array</returns>
+ private static short[] ReadShortArray0(int len, byte* data, int cnt)
+ {
+ short[] res = new short[len];
+
+ if (LittleEndian)
+ {
+ fixed (short* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ short val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteChar(char val)
+ {
+ WriteShort(*(short*)(&val));
+ }
+
+ /** <inheritdoc /> */
+ public char ReadChar()
+ {
+ short val = ReadShort();
+
+ return *(char*)(&val);
+ }
+
+ /// <summary>
+ /// Internal routine to write char array.
+ /// </summary>
+ /// <param name="val">Char array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteCharArray0(char[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (char* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ char val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read char array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Char array</returns>
+ private static char[] ReadCharArray0(int len, byte* data, int cnt)
+ {
+ char[] res = new char[len];
+
+ if (LittleEndian)
+ {
+ fixed (char* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ char val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write int value.
+ /// </summary>
+ /// <param name="val">Int value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteInt0(int val, byte* data)
+ {
+ if (LittleEndian)
+ *((int*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[3];
+ data[1] = valPtr[2];
+ data[2] = valPtr[1];
+ data[3] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read int value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Int value</returns>
+ private static int ReadInt0(byte* data) {
+ int val;
+
+ if (LittleEndian)
+ val = *((int*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[3];
+ valPtr[1] = data[2];
+ valPtr[2] = data[1];
+ valPtr[3] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write int array.
+ /// </summary>
+ /// <param name="val">Int array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteIntArray0(int[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (int* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ int val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read int array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Int array</returns>
+ private static int[] ReadIntArray0(int len, byte* data, int cnt)
+ {
+ int[] res = new int[len];
+
+ if (LittleEndian)
+ {
+ fixed (int* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ int val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteFloat(float val)
+ {
+ int val0 = *(int*)(&val);
+
+ WriteInt(val0);
+ }
+
+ /** <inheritdoc /> */
+ public float ReadFloat()
+ {
+ int val = ReadInt();
+
+ return BinaryUtils.IntToFloatBits(val);
+ }
+
+ /// <summary>
+ /// Internal routine to write float array.
+ /// </summary>
+ /// <param name="val">Int array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteFloatArray0(float[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (float* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ float val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read float array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Float array</returns>
+ private static float[] ReadFloatArray0(int len, byte* data, int cnt)
+ {
+ float[] res = new float[len];
+
+ if (LittleEndian)
+ {
+ fixed (float* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ int val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Internal routine to write long value.
+ /// </summary>
+ /// <param name="val">Long value.</param>
+ /// <param name="data">Data pointer.</param>
+ private static void WriteLong0(long val, byte* data)
+ {
+ if (LittleEndian)
+ *((long*)data) = val;
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ data[0] = valPtr[7];
+ data[1] = valPtr[6];
+ data[2] = valPtr[5];
+ data[3] = valPtr[4];
+ data[4] = valPtr[3];
+ data[5] = valPtr[2];
+ data[6] = valPtr[1];
+ data[7] = valPtr[0];
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read long value.
+ /// </summary>
+ /// <param name="data">Data pointer.</param>
+ /// <returns>Long value</returns>
+ private static long ReadLong0(byte* data)
+ {
+ long val;
+
+ if (LittleEndian)
+ val = *((long*)data);
+ else
+ {
+ byte* valPtr = (byte*)&val;
+
+ valPtr[0] = data[7];
+ valPtr[1] = data[6];
+ valPtr[2] = data[5];
+ valPtr[3] = data[4];
+ valPtr[4] = data[3];
+ valPtr[5] = data[2];
+ valPtr[6] = data[1];
+ valPtr[7] = data[0];
+ }
+
+ return val;
+ }
+
+ /// <summary>
+ /// Internal routine to write long array.
+ /// </summary>
+ /// <param name="val">Long array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteLongArray0(long[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (long* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ long val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[7];
+ *curPos++ = valPtr[6];
+ *curPos++ = valPtr[5];
+ *curPos++ = valPtr[4];
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read long array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Long array</returns>
+ private static long[] ReadLongArray0(int len, byte* data, int cnt)
+ {
+ long[] res = new long[len];
+
+ if (LittleEndian)
+ {
+ fixed (long* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ long val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[7] = *data++;
+ valPtr[6] = *data++;
+ valPtr[5] = *data++;
+ valPtr[4] = *data++;
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void WriteDouble(double val)
+ {
+ long val0 = *(long*)(&val);
+
+ WriteLong(val0);
+ }
+
+ /** <inheritdoc /> */
+ public double ReadDouble()
+ {
+ long val = ReadLong();
+
+ return BinaryUtils.LongToDoubleBits(val);
+ }
+
+ /// <summary>
+ /// Internal routine to write double array.
+ /// </summary>
+ /// <param name="val">Double array.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ private static void WriteDoubleArray0(double[] val, byte* data, int cnt)
+ {
+ if (LittleEndian)
+ {
+ fixed (double* val0 = val)
+ {
+ CopyMemory((byte*)val0, data, cnt);
+ }
+ }
+ else
+ {
+ byte* curPos = data;
+
+ for (int i = 0; i < val.Length; i++)
+ {
+ double val0 = val[i];
+
+ byte* valPtr = (byte*)&(val0);
+
+ *curPos++ = valPtr[7];
+ *curPos++ = valPtr[6];
+ *curPos++ = valPtr[5];
+ *curPos++ = valPtr[4];
+ *curPos++ = valPtr[3];
+ *curPos++ = valPtr[2];
+ *curPos++ = valPtr[1];
+ *curPos++ = valPtr[0];
+ }
+ }
+ }
+
+ /// <summary>
+ /// Internal routine to read double array.
+ /// </summary>
+ /// <param name="len">Count.</param>
+ /// <param name="data">Data pointer.</param>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Double array</returns>
+ private static double[] ReadDoubleArray0(int len, byte* data, int cnt)
+ {
+ double[] res = new double[len];
+
+ if (LittleEndian)
+ {
+ fixed (double* res0 = res)
+ {
+ CopyMemory(data, (byte*)res0, cnt);
+ }
+ }
+ else
+ {
+ for (int i = 0; i < len; i++)
+ {
+ double val;
+
+ byte* valPtr = (byte*)&val;
+
+ valPtr[7] = *data++;
+ valPtr[6] = *data++;
+ valPtr[5] = *data++;
+ valPtr[4] = *data++;
+ valPtr[3] = *data++;
+ valPtr[2] = *data++;
+ valPtr[1] = *data++;
+ valPtr[0] = *data++;
+
+ res[i] = val;
+ }
+ }
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ public void Write(byte[] src, int off, int cnt)
+ {
+ fixed (byte* src0 = src)
+ {
+ Write(src0 + off, cnt);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public void Read(byte[] dest, int off, int cnt)
+ {
+ fixed (byte* dest0 = dest)
+ {
+ Read(dest0 + off, cnt);
+ }
+ }
+
+ /// <summary>
+ /// Internal write routine.
+ /// </summary>
+ /// <param name="src">Source.</param>
+ /// <param name="cnt">Count.</param>
+ /// <param name="data">Data (dsetination).</param>
+ private void WriteInternal(byte* src, int cnt, byte* data)
+ {
+ CopyMemory(src, data + _pos, cnt);
+ }
+
+ /// <summary>
+ /// Internal read routine.
+ /// </summary>
+ /// <param name="src">Source</param>
+ /// <param name="dest">Destination.</param>
+ /// <param name="cnt">Count.</param>
+ /// <returns>Amount of bytes written.</returns>
+ private void ReadInternal(byte* src, byte* dest, int cnt)
+ {
+ int cnt0 = Math.Min(Remaining, cnt);
+
+ CopyMemory(src + _pos, dest, cnt0);
+
+ ShiftRead(cnt0);
+ }
+
+ /** <inheritdoc /> */
+ public int Position
+ {
+ get { return _pos; }
+ }
+
/** <inheritdoc /> */
- public override void WriteByte(byte val)
+ public int Remaining
+ {
+ get { return _data.Length - _pos; }
+ }
+
+ /// <summary>
+ /// Internal array.
+ /// </summary>
+ internal byte[] InternalArray
+ {
+ get { return _data; }
+ }
+
+ /// <inheritdoc />
+ /// <exception cref="T:System.ArgumentException">
+ /// Unsupported seek origin: + origin
+ /// or
+ /// Seek before origin: + newPos
+ /// </exception>
+ public int Seek(int offset, SeekOrigin origin)
+ {
+ int newPos;
+
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ {
+ newPos = offset;
+
+ break;
+ }
+
+ case SeekOrigin.Current:
+ {
+ newPos = _pos + offset;
+
+ break;
+ }
+
+ default:
+ throw new ArgumentException("Unsupported seek origin: " + origin);
+ }
+
+ if (newPos < 0)
+ throw new ArgumentException("Seek before origin: " + newPos);
+
+ EnsureWriteCapacity(newPos);
+
+ _pos = newPos;
+
+ return _pos;
+ }
+
+ /** <inheritdoc /> */
+ public void Flush()
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ GC.SuppressFinalize(this);
+
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Ensure capacity for write and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private int EnsureWriteCapacityAndShift(int cnt)
+ {
+ int pos0 = _pos;
+
+ EnsureWriteCapacity(_pos + cnt);
+
+ ShiftWrite(cnt);
+
+ return pos0;
+ }
+
+ /// <summary>
+ /// Ensure capacity for read and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private int EnsureReadCapacityAndShift(int cnt)
+ {
+ int pos0 = _pos;
+
+ EnsureReadCapacity(cnt);
+
+ ShiftRead(cnt);
+
+ return pos0;
+ }
+
+ /// <summary>
+ /// Shift position due to write
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ private void ShiftWrite(int cnt)
+ {
+ _pos += cnt;
+ }
+
+ /// <summary>
+ /// Shift position due to read.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ private void ShiftRead(int cnt)
+ {
+ _pos += cnt;
+ }
+
+ /// <summary>
+ /// Calculate new capacity.
+ /// </summary>
+ /// <param name="curCap">Current capacity.</param>
+ /// <param name="reqCap">Required capacity.</param>
+ /// <returns>New capacity.</returns>
+ private static int Capacity(int curCap, int reqCap)
+ {
+ int newCap;
+
+ if (reqCap < 256)
+ newCap = 256;
+ else
+ {
+ newCap = curCap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+ }
+
+ return newCap;
+ }
+
+ /// <summary>
+ /// Unsafe memory copy routine.
+ /// </summary>
+ /// <param name="src">Source.</param>
+ /// <param name="dest">Destination.</param>
+ /// <param name="len">Length.</param>
+ private static void CopyMemory(byte* src, byte* dest, int len)
+ {
+ PlatformMemoryUtils.CopyMemory(src, dest, len);
+ }
+
+ /** <inheritdoc /> */
+ public void WriteByte(byte val)
{
int pos0 = EnsureWriteCapacityAndShift(1);
@@ -62,7 +973,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override byte ReadByte()
+ public byte ReadByte()
{
int pos0 = EnsureReadCapacityAndShift(1);
@@ -71,7 +982,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteByteArray(byte[] val)
+ public void WriteByteArray(byte[] val)
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
@@ -82,7 +993,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override byte[] ReadByteArray(int cnt)
+ public byte[] ReadByteArray(int cnt)
{
int pos0 = EnsureReadCapacityAndShift(cnt);
@@ -94,7 +1005,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteBoolArray(bool[] val)
+ public void WriteBoolArray(bool[] val)
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
@@ -105,7 +1016,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override bool[] ReadBoolArray(int cnt)
+ public bool[] ReadBoolArray(int cnt)
{
int pos0 = EnsureReadCapacityAndShift(cnt);
@@ -116,7 +1027,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteShort(short val)
+ public void WriteShort(short val)
{
int pos0 = EnsureWriteCapacityAndShift(2);
@@ -127,7 +1038,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override short ReadShort()
+ public short ReadShort()
{
int pos0 = EnsureReadCapacityAndShift(2);
@@ -139,7 +1050,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteShortArray(short[] val)
+ public void WriteShortArray(short[] val)
{
int cnt = val.Length << 1;
@@ -152,7 +1063,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override short[] ReadShortArray(int cnt)
+ public short[] ReadShortArray(int cnt)
{
int cnt0 = cnt << 1;
@@ -166,7 +1077,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteCharArray(char[] val)
+ public void WriteCharArray(char[] val)
{
int cnt = val.Length << 1;
@@ -179,7 +1090,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override char[] ReadCharArray(int cnt)
+ public char[] ReadCharArray(int cnt)
{
int cnt0 = cnt << 1;
@@ -192,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteInt(int val)
+ public void WriteInt(int val)
{
int pos0 = EnsureWriteCapacityAndShift(4);
@@ -203,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteInt(int writePos, int val)
+ public void WriteInt(int writePos, int val)
{
EnsureWriteCapacity(writePos + 4);
@@ -214,7 +1125,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int ReadInt()
+ public int ReadInt()
{
int pos0 = EnsureReadCapacityAndShift(4);
@@ -226,7 +1137,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteIntArray(int[] val)
+ public void WriteIntArray(int[] val)
{
int cnt = val.Length << 2;
@@ -239,7 +1150,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int[] ReadIntArray(int cnt)
+ public int[] ReadIntArray(int cnt)
{
int cnt0 = cnt << 2;
@@ -253,7 +1164,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteFloatArray(float[] val)
+ public void WriteFloatArray(float[] val)
{
int cnt = val.Length << 2;
@@ -266,7 +1177,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override float[] ReadFloatArray(int cnt)
+ public float[] ReadFloatArray(int cnt)
{
int cnt0 = cnt << 2;
@@ -279,7 +1190,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void WriteLong(long val)
+ public void WriteLong(long val)
{
int pos0 = EnsureWriteCapacityAndShift(8);
@@ -290,7 +1201,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override long ReadLong()
+ public long ReadLong()
{
int pos0 = EnsureReadCapacityAndShift(8);
@@ -302,7 +1213,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteLongArray(long[] val)
+ public void WriteLongArray(long[] val)
{
int cnt = val.Length << 3;
@@ -315,7 +1226,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override long[] ReadLongArray(int cnt)
+ public long[] ReadLongArray(int cnt)
{
int cnt0 = cnt << 3;
@@ -329,7 +1240,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override void WriteDoubleArray(double[] val)
+ public void WriteDoubleArray(double[] val)
{
int cnt = val.Length << 3;
@@ -342,7 +1253,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override double[] ReadDoubleArray(int cnt)
+ public double[] ReadDoubleArray(int cnt)
{
int cnt0 = cnt << 3;
@@ -355,7 +1266,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
+ public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
{
int pos0 = EnsureWriteCapacityAndShift(byteCnt);
@@ -370,9 +1281,9 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void Write(byte* src, int cnt)
+ public void Write(byte* src, int cnt)
{
- EnsureWriteCapacity(Pos + cnt);
+ EnsureWriteCapacity(_pos + cnt);
fixed (byte* data0 = _data)
{
@@ -383,7 +1294,7 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override void Read(byte* dest, int cnt)
+ public void Read(byte* dest, int cnt)
{
fixed (byte* data0 = _data)
{
@@ -392,36 +1303,30 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
/** <inheritdoc /> */
- public override int Remaining
- {
- get { return _data.Length - Pos; }
- }
-
- /** <inheritdoc /> */
- public override byte[] GetArray()
+ public byte[] GetArray()
{
return _data;
}
/** <inheritdoc /> */
- public override byte[] GetArrayCopy()
+ public byte[] GetArrayCopy()
{
- byte[] copy = new byte[Pos];
+ byte[] copy = new byte[_pos];
- Buffer.BlockCopy(_data, 0, copy, 0, Pos);
+ Buffer.BlockCopy(_data, 0, copy, 0, _pos);
return copy;
}
/** <inheritdoc /> */
- public override bool IsSameArray(byte[] arr)
+ public bool IsSameArray(byte[] arr)
{
return _data == arr;
}
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
- public override T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
+ public T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg)
{
Debug.Assert(proc != null);
@@ -431,22 +1336,11 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
}
- /** <inheritdoc /> */
- protected override void Dispose(bool disposing)
- {
- // No-op.
- }
-
/// <summary>
- /// Internal array.
+ /// Ensure capacity for write.
/// </summary>
- internal byte[] InternalArray
- {
- get { return _data; }
- }
-
- /** <inheritdoc /> */
- protected override void EnsureWriteCapacity(int cnt)
+ /// <param name="cnt">Bytes count.</param>
+ private void EnsureWriteCapacity(int cnt)
{
if (cnt > _data.Length)
{
@@ -462,12 +1356,16 @@ namespace Apache.Ignite.Core.Impl.Binary.IO
}
}
- /** <inheritdoc /> */
- protected override void EnsureReadCapacity(int cnt)
+ /// <summary>
+ /// Ensure capacity for write and shift position.
+ /// </summary>
+ /// <param name="cnt">Bytes count.</param>
+ /// <returns>Position before shift.</returns>
+ private void EnsureReadCapacity(int cnt)
{
- if (_data.Length - Pos < cnt)
+ if (_data.Length - _pos < cnt)
throw new EndOfStreamException("Not enough data in stream [expected=" + cnt +
- ", remaining=" + (_data.Length - Pos) + ']');
+ ", remaining=" + (_data.Length - _pos) + ']');
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b7c508/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
deleted file mode 100644
index 0b855f8..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs
+++ /dev/null
@@ -1,1249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Binary.IO
-{
- using System;
- using System.IO;
- using System.Text;
- using Apache.Ignite.Core.Impl.Memory;
-
- /// <summary>
- /// Base class for managed and unmanaged data streams.
- /// </summary>
- internal abstract unsafe class BinaryStreamBase : IBinaryStream
- {
- /** Byte: zero. */
- private const byte ByteZero = 0;
-
- /** Byte: one. */
- private const byte ByteOne = 1;
-
- /** LITTLE_ENDIAN flag. */
- private static readonly bool LittleEndian = BitConverter.IsLittleEndian;
-
- /** Position. */
- protected int Pos;
-
- /** Disposed flag. */
- private bool _disposed;
-
- /// <summary>
- /// Write byte.
- /// </summary>
- /// <param name="val">Byte value.</param>
- public abstract void WriteByte(byte val);
-
- /// <summary>
- /// Read byte.
- /// </summary>
- /// <returns>
- /// Byte value.
- /// </returns>
- public abstract byte ReadByte();
-
- /// <summary>
- /// Write byte array.
- /// </summary>
- /// <param name="val">Byte array.</param>
- public abstract void WriteByteArray(byte[] val);
-
- /// <summary>
- /// Internal routine to write byte array.
- /// </summary>
- /// <param name="val">Byte array.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteByteArray0(byte[] val, byte* data)
- {
- fixed (byte* val0 = val)
- {
- CopyMemory(val0, data, val.Length);
- }
- }
-
- /// <summary>
- /// Read byte array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Byte array.
- /// </returns>
- public abstract byte[] ReadByteArray(int cnt);
-
- /// <summary>
- /// Internal routine to read byte array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <returns>Byte array</returns>
- protected static byte[] ReadByteArray0(int len, byte* data)
- {
- byte[] res = new byte[len];
-
- fixed (byte* res0 = res)
- {
- CopyMemory(data, res0, len);
- }
-
- return res;
- }
-
- /// <summary>
- /// Write bool.
- /// </summary>
- /// <param name="val">Bool value.</param>
- public void WriteBool(bool val)
- {
- WriteByte(val ? ByteOne : ByteZero);
- }
-
- /// <summary>
- /// Read bool.
- /// </summary>
- /// <returns>
- /// Bool value.
- /// </returns>
- public bool ReadBool()
- {
- return ReadByte() == ByteOne;
- }
-
- /// <summary>
- /// Write bool array.
- /// </summary>
- /// <param name="val">Bool array.</param>
- public abstract void WriteBoolArray(bool[] val);
-
- /// <summary>
- /// Internal routine to write bool array.
- /// </summary>
- /// <param name="val">Bool array.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteBoolArray0(bool[] val, byte* data)
- {
- fixed (bool* val0 = val)
- {
- CopyMemory((byte*)val0, data, val.Length);
- }
- }
-
- /// <summary>
- /// Read bool array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Bool array.
- /// </returns>
- public abstract bool[] ReadBoolArray(int cnt);
-
- /// <summary>
- /// Internal routine to read bool array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <returns>Bool array</returns>
- protected static bool[] ReadBoolArray0(int len, byte* data)
- {
- bool[] res = new bool[len];
-
- fixed (bool* res0 = res)
- {
- CopyMemory(data, (byte*)res0, len);
- }
-
- return res;
- }
-
- /// <summary>
- /// Write short.
- /// </summary>
- /// <param name="val">Short value.</param>
- public abstract void WriteShort(short val);
-
- /// <summary>
- /// Internal routine to write short value.
- /// </summary>
- /// <param name="val">Short value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteShort0(short val, byte* data)
- {
- if (LittleEndian)
- *((short*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[1];
- data[1] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read short.
- /// </summary>
- /// <returns>
- /// Short value.
- /// </returns>
- public abstract short ReadShort();
-
- /// <summary>
- /// Internal routine to read short value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Short value</returns>
- protected static short ReadShort0(byte* data)
- {
- short val;
-
- if (LittleEndian)
- val = *((short*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[1];
- valPtr[1] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write short array.
- /// </summary>
- /// <param name="val">Short array.</param>
- public abstract void WriteShortArray(short[] val);
-
- /// <summary>
- /// Internal routine to write short array.
- /// </summary>
- /// <param name="val">Short array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteShortArray0(short[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (short* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- short val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read short array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Short array.
- /// </returns>
- public abstract short[] ReadShortArray(int cnt);
-
- /// <summary>
- /// Internal routine to read short array.
- /// </summary>
- /// <param name="len">Array length.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Short array</returns>
- protected static short[] ReadShortArray0(int len, byte* data, int cnt)
- {
- short[] res = new short[len];
-
- if (LittleEndian)
- {
- fixed (short* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- short val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write char.
- /// </summary>
- /// <param name="val">Char value.</param>
- public void WriteChar(char val)
- {
- WriteShort(*(short*)(&val));
- }
-
- /// <summary>
- /// Read char.
- /// </summary>
- /// <returns>
- /// Char value.
- /// </returns>
- public char ReadChar()
- {
- short val = ReadShort();
-
- return *(char*)(&val);
- }
-
- /// <summary>
- /// Write char array.
- /// </summary>
- /// <param name="val">Char array.</param>
- public abstract void WriteCharArray(char[] val);
-
- /// <summary>
- /// Internal routine to write char array.
- /// </summary>
- /// <param name="val">Char array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteCharArray0(char[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (char* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- char val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read char array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Char array.
- /// </returns>
- public abstract char[] ReadCharArray(int cnt);
-
- /// <summary>
- /// Internal routine to read char array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Char array</returns>
- protected static char[] ReadCharArray0(int len, byte* data, int cnt)
- {
- char[] res = new char[len];
-
- if (LittleEndian)
- {
- fixed (char* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- char val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write int.
- /// </summary>
- /// <param name="val">Int value.</param>
- public abstract void WriteInt(int val);
-
- /// <summary>
- /// Write int to specific position.
- /// </summary>
- /// <param name="writePos">Position.</param>
- /// <param name="val">Value.</param>
- public abstract void WriteInt(int writePos, int val);
-
- /// <summary>
- /// Internal routine to write int value.
- /// </summary>
- /// <param name="val">Int value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteInt0(int val, byte* data)
- {
- if (LittleEndian)
- *((int*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[3];
- data[1] = valPtr[2];
- data[2] = valPtr[1];
- data[3] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read int.
- /// </summary>
- /// <returns>
- /// Int value.
- /// </returns>
- public abstract int ReadInt();
-
- /// <summary>
- /// Internal routine to read int value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Int value</returns>
- protected static int ReadInt0(byte* data) {
- int val;
-
- if (LittleEndian)
- val = *((int*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[3];
- valPtr[1] = data[2];
- valPtr[2] = data[1];
- valPtr[3] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write int array.
- /// </summary>
- /// <param name="val">Int array.</param>
- public abstract void WriteIntArray(int[] val);
-
- /// <summary>
- /// Internal routine to write int array.
- /// </summary>
- /// <param name="val">Int array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteIntArray0(int[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (int* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- int val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read int array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Int array.
- /// </returns>
- public abstract int[] ReadIntArray(int cnt);
-
- /// <summary>
- /// Internal routine to read int array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Int array</returns>
- protected static int[] ReadIntArray0(int len, byte* data, int cnt)
- {
- int[] res = new int[len];
-
- if (LittleEndian)
- {
- fixed (int* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- int val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write float.
- /// </summary>
- /// <param name="val">Float value.</param>
- public void WriteFloat(float val)
- {
- int val0 = *(int*)(&val);
-
- WriteInt(val0);
- }
-
- /// <summary>
- /// Read float.
- /// </summary>
- /// <returns>
- /// Float value.
- /// </returns>
- public float ReadFloat()
- {
- int val = ReadInt();
-
- return BinaryUtils.IntToFloatBits(val);
- }
-
- /// <summary>
- /// Write float array.
- /// </summary>
- /// <param name="val">Float array.</param>
- public abstract void WriteFloatArray(float[] val);
-
- /// <summary>
- /// Internal routine to write float array.
- /// </summary>
- /// <param name="val">Int array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteFloatArray0(float[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (float* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- float val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read float array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Float array.
- /// </returns>
- public abstract float[] ReadFloatArray(int cnt);
-
- /// <summary>
- /// Internal routine to read float array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Float array</returns>
- protected static float[] ReadFloatArray0(int len, byte* data, int cnt)
- {
- float[] res = new float[len];
-
- if (LittleEndian)
- {
- fixed (float* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- int val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write long.
- /// </summary>
- /// <param name="val">Long value.</param>
- public abstract void WriteLong(long val);
-
- /// <summary>
- /// Internal routine to write long value.
- /// </summary>
- /// <param name="val">Long value.</param>
- /// <param name="data">Data pointer.</param>
- protected static void WriteLong0(long val, byte* data)
- {
- if (LittleEndian)
- *((long*)data) = val;
- else
- {
- byte* valPtr = (byte*)&val;
-
- data[0] = valPtr[7];
- data[1] = valPtr[6];
- data[2] = valPtr[5];
- data[3] = valPtr[4];
- data[4] = valPtr[3];
- data[5] = valPtr[2];
- data[6] = valPtr[1];
- data[7] = valPtr[0];
- }
- }
-
- /// <summary>
- /// Read long.
- /// </summary>
- /// <returns>
- /// Long value.
- /// </returns>
- public abstract long ReadLong();
-
- /// <summary>
- /// Internal routine to read long value.
- /// </summary>
- /// <param name="data">Data pointer.</param>
- /// <returns>Long value</returns>
- protected static long ReadLong0(byte* data)
- {
- long val;
-
- if (LittleEndian)
- val = *((long*)data);
- else
- {
- byte* valPtr = (byte*)&val;
-
- valPtr[0] = data[7];
- valPtr[1] = data[6];
- valPtr[2] = data[5];
- valPtr[3] = data[4];
- valPtr[4] = data[3];
- valPtr[5] = data[2];
- valPtr[6] = data[1];
- valPtr[7] = data[0];
- }
-
- return val;
- }
-
- /// <summary>
- /// Write long array.
- /// </summary>
- /// <param name="val">Long array.</param>
- public abstract void WriteLongArray(long[] val);
-
- /// <summary>
- /// Internal routine to write long array.
- /// </summary>
- /// <param name="val">Long array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteLongArray0(long[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (long* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- long val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[7];
- *curPos++ = valPtr[6];
- *curPos++ = valPtr[5];
- *curPos++ = valPtr[4];
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read long array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Long array.
- /// </returns>
- public abstract long[] ReadLongArray(int cnt);
-
- /// <summary>
- /// Internal routine to read long array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Long array</returns>
- protected static long[] ReadLongArray0(int len, byte* data, int cnt)
- {
- long[] res = new long[len];
-
- if (LittleEndian)
- {
- fixed (long* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- long val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[7] = *data++;
- valPtr[6] = *data++;
- valPtr[5] = *data++;
- valPtr[4] = *data++;
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write double.
- /// </summary>
- /// <param name="val">Double value.</param>
- public void WriteDouble(double val)
- {
- long val0 = *(long*)(&val);
-
- WriteLong(val0);
- }
-
- /// <summary>
- /// Read double.
- /// </summary>
- /// <returns>
- /// Double value.
- /// </returns>
- public double ReadDouble()
- {
- long val = ReadLong();
-
- return BinaryUtils.LongToDoubleBits(val);
- }
-
- /// <summary>
- /// Write double array.
- /// </summary>
- /// <param name="val">Double array.</param>
- public abstract void WriteDoubleArray(double[] val);
-
- /// <summary>
- /// Internal routine to write double array.
- /// </summary>
- /// <param name="val">Double array.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- protected static void WriteDoubleArray0(double[] val, byte* data, int cnt)
- {
- if (LittleEndian)
- {
- fixed (double* val0 = val)
- {
- CopyMemory((byte*)val0, data, cnt);
- }
- }
- else
- {
- byte* curPos = data;
-
- for (int i = 0; i < val.Length; i++)
- {
- double val0 = val[i];
-
- byte* valPtr = (byte*)&(val0);
-
- *curPos++ = valPtr[7];
- *curPos++ = valPtr[6];
- *curPos++ = valPtr[5];
- *curPos++ = valPtr[4];
- *curPos++ = valPtr[3];
- *curPos++ = valPtr[2];
- *curPos++ = valPtr[1];
- *curPos++ = valPtr[0];
- }
- }
- }
-
- /// <summary>
- /// Read double array.
- /// </summary>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Double array.
- /// </returns>
- public abstract double[] ReadDoubleArray(int cnt);
-
- /// <summary>
- /// Internal routine to read double array.
- /// </summary>
- /// <param name="len">Count.</param>
- /// <param name="data">Data pointer.</param>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Double array</returns>
- protected static double[] ReadDoubleArray0(int len, byte* data, int cnt)
- {
- double[] res = new double[len];
-
- if (LittleEndian)
- {
- fixed (double* res0 = res)
- {
- CopyMemory(data, (byte*)res0, cnt);
- }
- }
- else
- {
- for (int i = 0; i < len; i++)
- {
- double val;
-
- byte* valPtr = (byte*)&val;
-
- valPtr[7] = *data++;
- valPtr[6] = *data++;
- valPtr[5] = *data++;
- valPtr[4] = *data++;
- valPtr[3] = *data++;
- valPtr[2] = *data++;
- valPtr[1] = *data++;
- valPtr[0] = *data++;
-
- res[i] = val;
- }
- }
-
- return res;
- }
-
- /// <summary>
- /// Write string.
- /// </summary>
- /// <param name="chars">Characters.</param>
- /// <param name="charCnt">Char count.</param>
- /// <param name="byteCnt">Byte count.</param>
- /// <param name="encoding">Encoding.</param>
- /// <returns>
- /// Amounts of bytes written.
- /// </returns>
- public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding);
-
- /// <summary>
- /// Write arbitrary data.
- /// </summary>
- /// <param name="src">Source array.</param>
- /// <param name="off">Offset</param>
- /// <param name="cnt">Count.</param>
- public void Write(byte[] src, int off, int cnt)
- {
- fixed (byte* src0 = src)
- {
- Write(src0 + off, cnt);
- }
- }
-
- /// <summary>
- /// Read arbitrary data.
- /// </summary>
- /// <param name="dest">Destination array.</param>
- /// <param name="off">Offset.</param>
- /// <param name="cnt">Count.</param>
- /// <returns>
- /// Amount of bytes read.
- /// </returns>
- public void Read(byte[] dest, int off, int cnt)
- {
- fixed (byte* dest0 = dest)
- {
- Read(dest0 + off, cnt);
- }
- }
-
- /// <summary>
- /// Write arbitrary data.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="cnt">Count.</param>
- public abstract void Write(byte* src, int cnt);
-
- /// <summary>
- /// Internal write routine.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="cnt">Count.</param>
- /// <param name="data">Data (dsetination).</param>
- protected void WriteInternal(byte* src, int cnt, byte* data)
- {
- CopyMemory(src, data + Pos, cnt);
- }
-
- /// <summary>
- /// Read arbitrary data.
- /// </summary>
- /// <param name="dest">Destination.</param>
- /// <param name="cnt">Count.</param>
- /// <returns></returns>
- public abstract void Read(byte* dest, int cnt);
-
- /// <summary>
- /// Internal read routine.
- /// </summary>
- /// <param name="src">Source</param>
- /// <param name="dest">Destination.</param>
- /// <param name="cnt">Count.</param>
- /// <returns>Amount of bytes written.</returns>
- protected void ReadInternal(byte* src, byte* dest, int cnt)
- {
- int cnt0 = Math.Min(Remaining, cnt);
-
- CopyMemory(src + Pos, dest, cnt0);
-
- ShiftRead(cnt0);
- }
-
- /// <summary>
- /// Position.
- /// </summary>
- public int Position
- {
- get { return Pos; }
- }
-
- /// <summary>
- /// Gets remaining bytes in the stream.
- /// </summary>
- /// <value>
- /// Remaining bytes.
- /// </value>
- public abstract int Remaining { get; }
-
- /// <summary>
- /// Gets underlying array, avoiding copying if possible.
- /// </summary>
- /// <returns>
- /// Underlying array.
- /// </returns>
- public abstract byte[] GetArray();
-
- /// <summary>
- /// Gets underlying data in a new array.
- /// </summary>
- /// <returns>
- /// New array with data.
- /// </returns>
- public abstract byte[] GetArrayCopy();
-
- /// <summary>
- /// Check whether array passed as argument is the same as the stream hosts.
- /// </summary>
- /// <param name="arr">Array.</param>
- /// <returns>
- /// <c>True</c> if they are same.
- /// </returns>
- public abstract bool IsSameArray(byte[] arr);
-
- /// <summary>
- /// Seek to the given position.
- /// </summary>
- /// <param name="offset">Offset.</param>
- /// <param name="origin">Seek origin.</param>
- /// <returns>
- /// Position.
- /// </returns>
- /// <exception cref="System.ArgumentException">
- /// Unsupported seek origin: + origin
- /// or
- /// Seek before origin: + newPos
- /// </exception>
- public int Seek(int offset, SeekOrigin origin)
- {
- int newPos;
-
- switch (origin)
- {
- case SeekOrigin.Begin:
- {
- newPos = offset;
-
- break;
- }
-
- case SeekOrigin.Current:
- {
- newPos = Pos + offset;
-
- break;
- }
-
- default:
- throw new ArgumentException("Unsupported seek origin: " + origin);
- }
-
- if (newPos < 0)
- throw new ArgumentException("Seek before origin: " + newPos);
-
- EnsureWriteCapacity(newPos);
-
- Pos = newPos;
-
- return Pos;
- }
-
- /// <summary>
- /// Returns a hash code for the specified byte range.
- /// </summary>
- public abstract T Apply<TArg, T>(IBinaryStreamProcessor<TArg, T> proc, TArg arg);
-
- /// <summary>
- /// Flushes the data to underlying storage.
- /// </summary>
- public void Flush()
- {
- // No-op.
- }
-
- /** <inheritdoc /> */
- public void Dispose()
- {
- if (_disposed)
- return;
-
- Dispose(true);
-
- GC.SuppressFinalize(this);
-
- _disposed = true;
- }
-
- /// <summary>
- /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
- /// </summary>
- protected abstract void Dispose(bool disposing);
-
- /// <summary>
- /// Ensure capacity for write.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected abstract void EnsureWriteCapacity(int cnt);
-
- /// <summary>
- /// Ensure capacity for write and shift position.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Position before shift.</returns>
- protected int EnsureWriteCapacityAndShift(int cnt)
- {
- int pos0 = Pos;
-
- EnsureWriteCapacity(Pos + cnt);
-
- ShiftWrite(cnt);
-
- return pos0;
- }
-
- /// <summary>
- /// Ensure capacity for read.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected abstract void EnsureReadCapacity(int cnt);
-
- /// <summary>
- /// Ensure capacity for read and shift position.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- /// <returns>Position before shift.</returns>
- protected int EnsureReadCapacityAndShift(int cnt)
- {
- int pos0 = Pos;
-
- EnsureReadCapacity(cnt);
-
- ShiftRead(cnt);
-
- return pos0;
- }
-
- /// <summary>
- /// Shift position due to write
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- protected void ShiftWrite(int cnt)
- {
- Pos += cnt;
- }
-
- /// <summary>
- /// Shift position due to read.
- /// </summary>
- /// <param name="cnt">Bytes count.</param>
- private void ShiftRead(int cnt)
- {
- Pos += cnt;
- }
-
- /// <summary>
- /// Calculate new capacity.
- /// </summary>
- /// <param name="curCap">Current capacity.</param>
- /// <param name="reqCap">Required capacity.</param>
- /// <returns>New capacity.</returns>
- protected static int Capacity(int curCap, int reqCap)
- {
- int newCap;
-
- if (reqCap < 256)
- newCap = 256;
- else
- {
- newCap = curCap << 1;
-
- if (newCap < reqCap)
- newCap = reqCap;
- }
-
- return newCap;
- }
-
- /// <summary>
- /// Unsafe memory copy routine.
- /// </summary>
- /// <param name="src">Source.</param>
- /// <param name="dest">Destination.</param>
- /// <param name="len">Length.</param>
- private static void CopyMemory(byte* src, byte* dest, int len)
- {
- PlatformMemoryUtils.CopyMemory(src, dest, len);
- }
- }
-}
[04/10] ignite git commit: ignite-5932
Posted by sb...@apache.org.
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af887544
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af887544
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af887544
Branch: refs/heads/ignite-5932
Commit: af887544099dd3e2f427b74a029f601ffddb4471
Parents: 1780062
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 12:30:02 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 12:30:02 2017 +0300
----------------------------------------------------------------------
.../dht/GridPartitionedGetFuture.java | 15 ++++++++--
.../dht/GridPartitionedSingleGetFuture.java | 21 ++++++++++----
.../dht/atomic/GridDhtAtomicCache.java | 6 ++--
.../dht/colocated/GridDhtColocatedCache.java | 30 +++++++++++++-------
.../cache/distributed/near/GridNearTxLocal.java | 23 ++++++++++-----
5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7993d05..7689a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -78,6 +78,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
private static IgniteLogger log;
/** */
+ protected final MvccCoordinatorVersion mvccVer;
+
+ /** */
private MvccQueryTracker mvccTracker;
/**
@@ -94,6 +97,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* @param skipVals Skip values flag.
* @param needVer If {@code true} returns values as tuples containing value and version.
* @param keepCacheObjects Keep cache objects flag.
+ * @param mvccVer Mvcc version.
*/
public GridPartitionedGetFuture(
GridCacheContext<K, V> cctx,
@@ -107,7 +111,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean needVer,
- boolean keepCacheObjects
+ boolean keepCacheObjects,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
super(cctx,
keys,
@@ -121,6 +126,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
needVer,
keepCacheObjects,
recovery);
+ assert mvccVer == null || cctx.mvccEnabled();
+
+ this.mvccVer = mvccVer;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
@@ -133,6 +141,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
if (!cctx.mvccEnabled())
return null;
+ if (mvccVer != null)
+ return mvccVer;
+
MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]";
@@ -158,7 +169,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
}
- if (cctx.mvccEnabled()) {
+ if (cctx.mvccEnabled() && mvccVer == null) {
mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
trackable = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b34687f..afef744 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,11 +41,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
@GridToStringInclude
private ClusterNode node;
+ /** */
+ protected final MvccCoordinatorVersion mvccVer;
+
/**
* @param cctx Context.
* @param key Key.
@@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
boolean skipVals,
boolean needVer,
boolean keepCacheObjects,
- boolean recovery
+ boolean recovery,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
assert key != null;
+ assert mvccVer == null || cctx.mvccEnabled();
AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
@@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
this.keepCacheObjects = keepCacheObjects;
this.recovery = recovery;
this.topVer = topVer;
+ this.mvccVer = mvccVer;
futId = IgniteUuid.randomUuid();
@@ -275,6 +282,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
cctx.mvcc().addFuture(this, futId);
}
+ // TODO IGNITE-3478.
GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
futId.localId(),
key,
@@ -355,7 +363,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
boolean skipEntry = readNoEntry;
if (readNoEntry) {
- CacheDataRow row = cctx.offheap().read(cctx, key); // TODO IGNITE-3478
+ CacheDataRow row = mvccVer != null ? cctx.offheap().mvccRead(cctx, key, mvccVer) :
+ cctx.offheap().read(cctx, key);
if (row != null) {
long expireTime = row.expireTime();
@@ -398,8 +407,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName,
expiryPlc,
true,
- null,
- null); // TODO IGNITE-3478
+ mvccVer,
+ null);
if (res != null) {
v = res.value();
@@ -418,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName,
expiryPlc,
true,
- null); // TODO IGNITE-3478
+ mvccVer);
}
colocated.context().evicts().touch(entry, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 16416cc..d6862fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
skipVals,
needVer,
false,
- recovery);
+ recovery,
+ null);
fut.init();
@@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
skipVals,
needVer,
- false);
+ false,
+ null);
fut.init(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7364cb3..c975edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals,
needVer,
/*keepCacheObjects*/false,
- opCtx != null && opCtx.recovery());
+ opCtx != null && opCtx.recovery(),
+ null);
fut.init();
@@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param needVer Need version.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(
+ private IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean forcePrimary,
@@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
needVer,
- false);
+ false,
+ null);
}
/**
@@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean skipVals,
boolean needVer,
boolean keepCacheObj,
- boolean recovery
+ boolean recovery,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
ctx.toCacheKeyObject(key),
@@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
skipVals,
needVer,
keepCacheObj,
- recovery);
+ recovery,
+ mvccVer);
fut.init();
@@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param skipVals Skip values flag.
* @param needVer If {@code true} returns values as tuples containing value and version.
* @param keepCacheObj Keep cache objects flag.
+ * @param mvccVer Mvcc version.
* @return Load future.
*/
public final IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean needVer,
- boolean keepCacheObj
+ boolean keepCacheObj,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
+ assert mvccVer == null || ctx.mvccEnabled();
+
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc = expiryPolicy(null);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) {
+ if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || mvccVer != null)) {
try {
Map<K, V> locVals = null;
@@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
expiryPlc,
!deserializeBinary,
- null,
+ mvccVer,
null);
if (getRes != null) {
@@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
expiryPlc,
!deserializeBinary,
- null);
+ mvccVer);
}
// Entry was not in memory or in swap, so we remove it from cache.
@@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc,
skipVals,
needVer,
- keepCacheObj);
+ keepCacheObj,
+ mvccVer);
fut.init(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/af887544/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c8dfc9f..08f20de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1659,6 +1659,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
}
+ private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+ if (!cctx.mvccEnabled() || mvccTracker == null)
+ return null;
+
+ return mvccTracker.mvccVersion();
+ }
+
/**
* @param cacheCtx Cache context.
* @param keys Keys to get.
@@ -1830,8 +1837,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
txEntry.keepBinary(),
- null,
- null); // TODO IGNITE-3478
+ null, // TODO IGNITE-3478
+ null);
if (getRes != null) {
val = getRes.value();
@@ -2214,8 +2221,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null,
- null) : null; // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+ null) : null;
if (getRes != null) {
val = getRes.value();
@@ -2234,7 +2241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null); // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
}
if (val != null) {
@@ -2572,7 +2579,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
skipVals,
needVer,
/*keepCacheObject*/true,
- recovery
+ recovery,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Object>, Void>() {
@Override public Void apply(IgniteInternalFuture<Object> f) {
try {
@@ -2603,7 +2611,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
expiryPlc0,
skipVals,
needVer,
- /*keepCacheObject*/true
+ /*keepCacheObject*/true,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
@Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
[07/10] ignite git commit: ignite-3478 Fixed query ack
Posted by sb...@apache.org.
ignite-3478 Fixed query ack
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f29d4bc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f29d4bc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f29d4bc5
Branch: refs/heads/ignite-5932
Commit: f29d4bc50801c530ef856d168fb637b0fad1c27b
Parents: 2374296
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:07 2017 +0300
----------------------------------------------------------------------
.../cache/mvcc/CacheCoordinatorsProcessor.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f29d4bc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 9f9a7a3..85dde15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -261,7 +261,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
crd = crdNode != null ? new
- MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null;
+ MvccCoordinator(crdNode.id(), coordinatorVersion(topVer), new AffinityTopologyVersion(topVer, 0)) : null;
if (crd != null)
log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']');
@@ -274,6 +274,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param topVer Topology version.
+ * @return Coordinator version.
+ */
+ private long coordinatorVersion(long topVer) {
+ return topVer + ctx.discovery().gridStartTime();
+ }
+
+ /**
* @param log Logger.
*/
public void dumpStatistics(IgniteLogger log) {
@@ -1022,7 +1030,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
", topVer=" + topVer + ']');
- crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
+ crdVer = coordinatorVersion(topVer.topologyVersion());
prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
[08/10] ignite git commit: ignite-5932
Posted by sb...@apache.org.
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6670e8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6670e8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6670e8c
Branch: refs/heads/ignite-5932
Commit: d6670e8c80b225e29c04150f10a26c51de66b0f6
Parents: af88754
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:49 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 22 +-
.../near/GridNearTxFinishAndAckFuture.java | 16 +-
.../cache/distributed/near/GridNearTxLocal.java | 4 +
.../cache/mvcc/CacheCoordinatorsProcessor.java | 95 ++++++---
.../cache/mvcc/CoordinatorAckRequestQuery.java | 130 ++++++++++++
.../cache/mvcc/CoordinatorAckRequestTx.java | 201 +++++++++++++++++++
.../mvcc/CoordinatorAckRequestTxAndQuery.java | 120 +++++++++++
.../mvcc/CoordinatorAckRequestTxAndQueryEx.java | 144 +++++++++++++
.../cache/mvcc/CoordinatorQueryAckRequest.java | 130 ------------
.../cache/mvcc/CoordinatorTxAckRequest.java | 194 ------------------
.../processors/cache/mvcc/MvccCounter.java | 2 +-
.../processors/cache/mvcc/MvccQueryTracker.java | 37 ++++
.../cache/mvcc/PreviousCoordinatorQueries.java | 16 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 8 +-
14 files changed, 747 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 99bc8af..6a59c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -103,10 +103,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
@@ -892,7 +894,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 131: // TODO IGNITE-3478 fix constants.
- msg = new CoordinatorTxAckRequest();
+ msg = new CoordinatorAckRequestTx();
break;
@@ -907,7 +909,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 134:
- msg = new CoordinatorQueryAckRequest();
+ msg = new CoordinatorAckRequestQuery();
break;
@@ -937,6 +939,16 @@ public class GridIoMessageFactory implements MessageFactory {
return msg;
case 141:
+ msg = new CoordinatorAckRequestTxAndQuery();
+
+ return msg;
+
+ case 142:
+ msg = new CoordinatorAckRequestTxAndQueryEx();
+
+ return msg;
+
+ case 143:
msg = new MvccCounter();
return msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..5d8b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
@Override public void apply(final GridNearTxFinishFuture fut) {
GridNearTxLocal tx = fut.tx();
+ IgniteInternalFuture<Void> ackFut = null;
+
+ MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
TxMvccInfo mvccInfo = tx.mvccInfo();
- if (mvccInfo != null) {
- IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
- mvccInfo.coordinator(), mvccInfo.version());
+ if (qryTracker != null)
+ ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+ else if (mvccInfo != null) {
+ ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+ mvccInfo.version(),
+ null);
+ }
+ if (ackFut != null) {
ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> ackFut) {
Exception err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 08f20de..c774f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,6 +237,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
trackTimeout = cctx.time().addTimeoutObject(this);
}
+ MvccQueryTracker mvccQueryTracker() {
+ return mvccTracker;
+ }
+
/** {@inheritDoc} */
@Override public boolean near() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b89ce73..59eae1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -176,7 +176,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
- statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+ statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
statCntrs[4] = new StatCounter("TotalRequests");
statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -331,20 +331,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
assert crd != null;
- long trackCntr = mvccVer.counter();
-
- MvccLongList txs = mvccVer.activeTransactions();
+ long trackCntr = queryTrackCounter(mvccVer);
- if (txs != null) {
- for (int i = 0; i < txs.size(); i++) {
- long txId = txs.get(i);
-
- if (txId < trackCntr)
- trackCntr = txId;
- }
- }
-
- Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+ Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
try {
@@ -363,6 +352,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param mvccVer Read version.
+ * @return
+ */
+ private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+ long trackCntr = mvccVer.counter();
+
+ MvccLongList txs = mvccVer.activeTransactions();
+
+ int size = txs.size();
+
+ for (int i = 0; i < size; i++) {
+ long txId = txs.get(i);
+
+ if (txId < trackCntr)
+ trackCntr = txId;
+ }
+
+ return trackCntr;
+ }
+
+ /**
* @param crd Coordinator.
* @return Counter request future.
*/
@@ -422,22 +432,42 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/**
* @param crd Coordinator.
- * @param mvccVer Transaction version.
+ * @param updateVer Transaction update version.
+ * @param readVer Transaction read version.
* @return Acknowledge future.
*/
- public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+ public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+ MvccCoordinatorVersion updateVer,
+ @Nullable MvccCoordinatorVersion readVer) {
assert crd != null;
- assert mvccVer != null;
+ assert updateVer != null;
WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
ackFuts.put(fut.id, fut);
+ MvccCoordinatorMessage msg;
+
+ if (readVer != null) {
+ long trackCntr = queryTrackCounter(readVer);
+
+ if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+ msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+ updateVer.counter(),
+ trackCntr);
+ }
+ else {
+ msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+ updateVer.counter(),
+ readVer.coordinatorVersion(),
+ trackCntr);
+ }
+ }
+ else
+ msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
+
try {
- ctx.io().sendToGridTopic(crd,
- MSG_TOPIC,
- new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
- MSG_POLICY);
+ ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
}
catch (IgniteCheckedException e) {
if (ackFuts.remove(fut.id) != null) {
@@ -456,7 +486,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param mvccVer Transaction version.
*/
public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
- CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+ CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
msg.skipResponse(true);
@@ -568,7 +598,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param nodeId Node ID.
* @param msg Message.
*/
- private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+ private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
onQueryDone(nodeId, msg.counter());
}
@@ -577,16 +607,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param msg Message.
*/
private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
- prevCrdQueries.onQueryDone(nodeId, msg);
+ prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
- private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
onTxDone(msg.txCounter());
+ if (msg.queryCounter() != COUNTER_NA) {
+ if (msg.queryCoordinatorVersion() == 0)
+ onQueryDone(nodeId, msg.queryCounter());
+ else
+ prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+ }
+
if (STAT_CNTRS)
statCntrs[2].update();
@@ -1238,12 +1275,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
if (msg instanceof CoordinatorTxCounterRequest)
processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
- else if (msg instanceof CoordinatorTxAckRequest)
- processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestTx)
+ processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
else if (msg instanceof CoordinatorFutureResponse)
processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
- else if (msg instanceof CoordinatorQueryAckRequest)
- processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestQuery)
+ processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
else if (msg instanceof CoordinatorQueryVersionRequest)
processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
else if (msg instanceof MvccCoordinatorVersionResponse)
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long cntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param cntr Query counter.
+ */
+ CoordinatorAckRequestQuery(long cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 134;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..a3904fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+ /** */
+ private long futId;
+
+ /** */
+ private long txCntr;
+
+ /** */
+ private byte flags;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction.
+ */
+ CoordinatorAckRequestTx(long futId, long txCntr) {
+ this.futId = futId;
+ this.txCntr = txCntr;
+ }
+
+ long queryCounter() {
+ return CacheCoordinatorsProcessor.COUNTER_NA;
+ }
+
+ long queryCoordinatorVersion() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return {@code True} if response message is not needed.
+ */
+ boolean skipResponse() {
+ return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param val {@code True} if response message is not needed.
+ */
+ void skipResponse(boolean val) {
+ if (val)
+ flags |= SKIP_RESPONSE_FLAG_MASK;
+ else
+ flags &= ~SKIP_RESPONSE_FLAG_MASK;
+ }
+
+ /**
+ * @return Counter assigned tp transaction.
+ */
+ public long txCounter() {
+ return txCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("txCntr", txCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ txCntr = reader.readLong("txCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 131;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..91f27b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 141;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..1808697
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+ /** */
+ private long qryCrdVer;
+
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQueryEx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCrdVer Version of coordinator assigned read counter.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCrdVer = qryCrdVer;
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCoordinatorVersion() {
+ return qryCrdVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("qryCrdVer", qryCrdVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ qryCrdVer = reader.readLong("qryCrdVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 142;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long cntr;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorQueryAckRequest() {
- // No-op.
- }
-
- /**
- * @param cntr Query counter.
- */
- CoordinatorQueryAckRequest(long cntr) {
- this.cntr = cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Counter.
- */
- public long counter() {
- return cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeLong("cntr", cntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cntr = reader.readLong("cntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 134;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorQueryAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
- /** */
- private long futId;
-
- /** */
- private long txCntr;
-
- /** */
- private byte flags;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorTxAckRequest() {
- // No-op.
- }
-
- /**
- * @param futId Future ID.
- * @param txCntr Counter assigned to transaction.
- */
- CoordinatorTxAckRequest(long futId, long txCntr) {
- this.futId = futId;
- this.txCntr = txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Future ID.
- */
- long futureId() {
- return futId;
- }
-
- /**
- * @return {@code True} if response message is not needed.
- */
- boolean skipResponse() {
- return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
- }
-
- /**
- * @param val {@code True} if response message is not needed.
- */
- void skipResponse(boolean val) {
- if (val)
- flags |= SKIP_RESPONSE_FLAG_MASK;
- else
- flags &= ~SKIP_RESPONSE_FLAG_MASK;
- }
-
- /**
- * @return Counter assigned tp transaction.
- */
- public long txCounter() {
- return txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong("txCntr", txCntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- txCntr = reader.readLong("txCntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorTxAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 131;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorTxAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..33407b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -143,7 +143,7 @@ public class MvccCounter implements Message {
/** {@inheritDoc} */
@Override public short directType() {
- return 141;
+ return 143;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 8c421fc..e45b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
@@ -115,6 +116,42 @@ public class MvccQueryTracker {
cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
}
+ public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+ MvccCoordinator mvccCrd0 = null;
+ MvccCoordinatorVersion mvccVer0 = null;
+
+ synchronized (this) {
+ if (mvccVer != null) {
+ assert mvccCrd != null;
+
+ mvccCrd0 = mvccCrd;
+ mvccVer0 = mvccVer;
+
+ mvccVer = null; // Mark as finished.
+ }
+ }
+
+ if (mvccVer0 != null) {
+ if (mvccInfo == null) {
+ cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+ return null;
+ }
+ else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
+ else {
+ cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+ }
+ }
+
+ if (mvccInfo != null)
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+
+ return null;
+ }
+
/**
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..667865b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -158,23 +158,27 @@ class PreviousCoordinatorQueries {
/**
* @param nodeId Node ID.
- * @param msg Message.
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
*/
- void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+ void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+ assert crdVer != 0;
+ assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
synchronized (this) {
- MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+ MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
if (nodeQueries == null)
activeQueries.put(nodeId, nodeQueries = new HashMap<>());
- Integer qryCnt = nodeQueries.get(cntr);
+ Integer qryCnt = nodeQueries.get(mvccCntr);
int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
if (newQryCnt == 0) {
- nodeQueries.remove(cntr);
+ nodeQueries.remove(mvccCntr);
if (nodeQueries.isEmpty()) {
activeQueries.remove(nodeId);
@@ -184,7 +188,7 @@ class PreviousCoordinatorQueries {
}
}
else
- nodeQueries.put(cntr, newQryCnt);
+ nodeQueries.put(mvccCntr, newQryCnt);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 82201ea..8964cd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -655,7 +655,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
boolean block = true;
@Override public boolean apply(ClusterNode node, Message msg) {
- if (block && msg instanceof CoordinatorTxAckRequest) {
+ if (block && msg instanceof CoordinatorAckRequestTx) {
block = false;
return true;
@@ -991,7 +991,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
@Override public void apply(ClusterNode node, Message msg) {
- if (msg instanceof CoordinatorTxAckRequest)
+ if (msg instanceof CoordinatorAckRequestTx)
doSleep(2000);
}
});
@@ -1110,7 +1110,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
private boolean blocked;
@Override public boolean apply(ClusterNode node, Message msg) {
- if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+ if (!blocked && (msg instanceof CoordinatorAckRequestTx)) {
blocked = true;
return true;
@@ -2055,7 +2055,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1));
- TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+ TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class,
getTestIgniteInstanceName(0));
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {