You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:35:35 UTC
[43/50] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol
change: notify client node from backups
ignite-4705 Atomic cache protocol change: notify client node from backups
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbc472fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbc472fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbc472fe
Branch: refs/heads/ignite-4768-1
Commit: cbc472fe7f058db42ce49652c85981c7b797d229
Parents: f59f46d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 18:07:20 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 18:08:50 2017 +0300
----------------------------------------------------------------------
.../rest/protocols/tcp/MockNioSession.java | 5 +-
.../org/apache/ignite/internal/IgnitionEx.java | 5 +-
.../internal/binary/BinaryObjectImpl.java | 43 +-
.../connection/GridClientNioTcpConnection.java | 2 +-
.../managers/communication/GridIoManager.java | 5 +-
.../communication/GridIoMessageFactory.java | 20 +-
.../processors/cache/CacheObjectContext.java | 3 +-
.../processors/cache/GridCacheAtomicFuture.java | 5 +-
.../processors/cache/GridCacheIoManager.java | 83 +-
.../processors/cache/GridCacheMapEntry.java | 12 +-
.../processors/cache/GridCacheMessage.java | 17 +-
.../processors/cache/GridCacheMvccManager.java | 48 +-
.../processors/cache/GridCacheProcessor.java | 1 -
.../processors/cache/GridCacheReturn.java | 6 +-
.../cache/GridDeferredAckMessageSender.java | 17 +-
.../processors/cache/KeyCacheObjectImpl.java | 65 +-
.../dht/GridClientPartitionTopology.java | 8 +
.../dht/GridDhtPartitionTopology.java | 9 +
.../dht/GridDhtPartitionTopologyImpl.java | 23 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 298 +++---
.../GridDhtAtomicAbstractUpdateRequest.java | 392 +++++++-
.../dht/atomic/GridDhtAtomicCache.java | 896 +++++++++--------
.../GridDhtAtomicDeferredUpdateResponse.java | 68 +-
.../dht/atomic/GridDhtAtomicNearResponse.java | 314 ++++++
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 101 +-
.../GridDhtAtomicSingleUpdateRequest.java | 277 +-----
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 89 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 325 ++-----
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 124 +--
...idNearAtomicAbstractSingleUpdateRequest.java | 481 +---------
.../GridNearAtomicAbstractUpdateFuture.java | 468 +++++++--
.../GridNearAtomicAbstractUpdateRequest.java | 480 ++++++++-
.../GridNearAtomicCheckUpdateRequest.java | 175 ++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 487 +---------
...GridNearAtomicSingleUpdateFilterRequest.java | 23 +-
.../GridNearAtomicSingleUpdateFuture.java | 617 ++++++------
...GridNearAtomicSingleUpdateInvokeRequest.java | 37 +-
.../GridNearAtomicSingleUpdateRequest.java | 65 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 962 +++++++++++--------
.../atomic/GridNearAtomicUpdateResponse.java | 192 ++--
.../distributed/dht/atomic/UpdateErrors.java | 222 +++++
.../distributed/near/GridNearAtomicCache.java | 27 +-
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 5 +-
.../IgniteCacheObjectProcessorImpl.java | 18 +-
.../ignite/internal/util/StripedExecutor.java | 8 +-
.../util/future/GridCompoundFuture.java | 11 +-
.../internal/util/ipc/IpcToNioAdapter.java | 7 +-
.../nio/GridConnectionBytesVerifyFilter.java | 7 +-
.../util/nio/GridNioAsyncNotifyFilter.java | 7 +-
.../internal/util/nio/GridNioCodecFilter.java | 9 +-
.../util/nio/GridNioEmbeddedFuture.java | 7 +
.../ignite/internal/util/nio/GridNioFilter.java | 12 +-
.../internal/util/nio/GridNioFilterAdapter.java | 7 +-
.../internal/util/nio/GridNioFilterChain.java | 15 +-
.../util/nio/GridNioFinishedFuture.java | 5 -
.../ignite/internal/util/nio/GridNioFuture.java | 7 -
.../internal/util/nio/GridNioFutureImpl.java | 18 +-
.../ignite/internal/util/nio/GridNioServer.java | 83 +-
.../internal/util/nio/GridNioSession.java | 7 +-
.../internal/util/nio/GridNioSessionImpl.java | 9 +-
.../util/nio/GridNioSessionMetaKey.java | 5 +-
.../util/nio/GridShmemCommunicationClient.java | 6 +-
.../util/nio/GridTcpNioCommunicationClient.java | 13 +-
.../internal/util/nio/SessionWriteRequest.java | 7 -
.../internal/util/nio/ssl/GridNioSslFilter.java | 12 +-
.../util/nio/ssl/GridNioSslHandler.java | 29 +-
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
.../org/apache/ignite/thread/IgniteThread.java | 41 +-
.../ignite/thread/IgniteThreadFactory.java | 2 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../cache/CacheRebalancingSelfTest.java | 16 +-
.../GridCacheAbstractFailoverSelfTest.java | 2 -
.../GridCacheAtomicMessageCountSelfTest.java | 22 +-
.../IgniteCacheEntryListenerAbstractTest.java | 1 +
...niteCacheClientNodeChangingTopologyTest.java | 7 -
.../IgniteCacheMessageRecoveryAbstractTest.java | 2 +-
.../dht/GridCacheAtomicNearCacheSelfTest.java | 23 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 37 +-
...gniteCachePutRetryTransactionalSelfTest.java | 2 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 883 +++++++++++++++++
...erNoStripedPoolMultiNodeFullApiSelfTest.java | 35 -
.../near/GridCacheNearReadersSelfTest.java | 17 +-
...edNoStripedPoolMultiNodeFullApiSelfTest.java | 35 -
...eContinuousQueryAsyncFilterListenerTest.java | 2 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 31 +-
...eCacheContinuousQueryImmutableEntryTest.java | 2 +-
.../nio/GridNioEmbeddedFutureSelfTest.java | 2 +-
.../util/future/nio/GridNioFutureSelfTest.java | 25 +-
.../nio/impl/GridNioFilterChainSelfTest.java | 12 +-
.../file/GridFileSwapSpaceSpiSelfTest.java | 2 +-
.../IgniteCacheFullApiSelfTestSuite.java | 8 +-
.../testsuites/IgniteCacheTestSuite5.java | 3 +
.../HadoopExternalCommunication.java | 9 +-
.../communication/HadoopIpcToNioAdapter.java | 7 +-
.../communication/HadoopMarshallerFilter.java | 10 +-
.../cache/IgniteGetAndPutBenchmark.java | 2 +-
.../cache/IgniteGetAndPutTxBenchmark.java | 2 +-
98 files changed, 5462 insertions(+), 3597 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index 9bc4e7f..9d1755f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp;
import java.net.InetSocketAddress;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
/**
@@ -112,7 +114,8 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
}
/** {@inheritDoc} */
- @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+ @Override public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
+ throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 2d35cdb..f6cfe12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1703,8 +1703,9 @@ public class IgnitionEx {
sysExecSvc.allowCoreThreadTimeOut(true);
- if (cfg.getStripedPoolSize() > 0)
- stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
+ validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
+
+ stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 7a81659..6fe1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -17,6 +17,17 @@
package org.apache.ignite.internal.binary;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException;
@@ -33,19 +44,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.UUID;
-
-import static java.nio.charset.StandardCharsets.*;
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Binary object implementation.
@@ -74,7 +73,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
private boolean detachAllowed;
/** */
- @GridDirectTransient
private int part = -1;
/**
@@ -561,7 +559,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
start = in.readInt();
}
-
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -584,6 +581,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
writer.incrementState();
case 1:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
if (!writer.writeInt("start", detachAllowed ? 0 : start))
return false;
@@ -611,6 +614,14 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
reader.incrementState();
case 1:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
start = reader.readInt("start");
if (!reader.isLastRead())
@@ -620,7 +631,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
}
- return true;
+ return reader.afterMessageRead(BinaryObjectImpl.class);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..d3a30fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -229,7 +229,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
GridNioFuture<?> sslHandshakeFut = null;
if (sslCtx != null) {
- sslHandshakeFut = new GridNioFutureImpl<>();
+ sslHandshakeFut = new GridNioFutureImpl<>(null);
meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0c0dbf7..23738d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -818,10 +818,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- if (ctx.config().getStripedPoolSize() > 0 &&
- plc == GridIoPolicy.SYSTEM_POOL &&
- msg.partition() != Integer.MIN_VALUE
- ) {
+ if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
ctx.getStripedExecutorService().execute(msg.partition(), c);
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..0548581 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,14 +67,17 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -118,11 +121,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -173,6 +176,21 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -47:
+ msg = new GridNearAtomicCheckUpdateRequest();
+
+ break;
+
+ case -46:
+ msg = new UpdateErrors();
+
+ break;
+
+ case -45:
+ msg = new GridDhtAtomicNearResponse();
+
+ break;
+
case -44:
msg = new TcpCommunicationSpi.HandshakeMessage2();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index c4203ef..a777ab6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -33,7 +33,8 @@ import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-@SuppressWarnings("TypeMayBeWeakened") public class CacheObjectContext {
+@SuppressWarnings("TypeMayBeWeakened")
+public class CacheObjectContext {
/** */
private GridKernalContext kernalCtx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 3e11d50..8df229e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Update future for atomic cache.
*/
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
- * @return Future version.
+ * @return Future ID.
*/
- public GridCacheVersion version();
+ public Long id();
/**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1f28201..1cd8fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -47,15 +47,18 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -85,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -213,15 +217,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
if (fut != null && !fut.isDone()) {
+ Thread curThread = Thread.currentThread();
+
+ final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1;
+
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ Runnable c = new Runnable() {
@Override public void run() {
IgniteLogger log = cacheMsg.messageLogger(cctx);
if (log.isDebugEnabled()) {
StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
- "affinity topology version [");
+ "affinity topology version [");
appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
@@ -230,7 +238,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
handleMessage(nodeId, cacheMsg);
}
- });
+ };
+
+ if (stripe >= 0)
+ cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
+ else
+ cctx.kernalContext().closure().runLocalSafe(c);
}
});
@@ -471,15 +484,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
- @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
+ @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
- return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+ return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
- return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
+ return ((GridNearAtomicUpdateResponse) cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
- return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+ return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
- return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
+ return ((GridDhtAtomicUpdateResponse) cacheMsg).futureId();
+ else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
+ return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
return null;
}
@@ -490,9 +505,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @return Atomic future ID if applicable for message.
*/
@Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
- if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
- return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
- else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+ if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
return null;
@@ -561,12 +574,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
ctx.cacheId(),
- req.futureVersion(),
+ req.partition(),
+ req.futureId(),
ctx.deploymentEnabled());
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+
+ if (req.nearNodeId() != null) {
+ GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+ req.partition(),
+ req.nearFutureId(),
+ nodeId,
+ req.flags());
+
+ nearRes.errors(new UpdateErrors(req.classError()));
+
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ }
}
break;
@@ -577,7 +603,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -755,7 +783,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -771,7 +801,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -787,7 +819,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -802,12 +836,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
ctx.cacheId(),
- req.futureVersion(),
+ req.partition(),
+ req.futureId(),
ctx.deploymentEnabled());
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+
+ if (req.nearNodeId() != null) {
+ GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+ req.partition(),
+ req.nearFutureId(),
+ nodeId,
+ req.flags());
+
+ nearRes.errors(new UpdateErrors(req.classError()));
+
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+ }
}
break;
@@ -894,7 +941,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
@SuppressWarnings("unchecked")
public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
- assert !node.isLocal();
+ assert !node.isLocal() : node;
if (!onSend(msg, node.id()))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2237e22..54b4ed7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,8 +2170,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert conflictCtx != null;
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
// Use old value?
if (conflictCtx.isUseOld()) {
GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2180,7 +2178,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!isNew() && // Not initial value,
verCheck && // and atomic version check,
oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
- ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+ ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
cctx.writeThrough() && // and store is enabled,
primary) // and we are primary.
{
@@ -2226,13 +2224,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
conflictVer = null;
}
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
// Perform version check only in case there was no explicit conflict resolution.
if (conflictCtx == null) {
if (verCheck) {
- if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
- if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+ if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
+ if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
if (log.isDebugEnabled())
log.debug("Received entry update with same version as current (will update store) " +
"[entry=" + this + ", newVer=" + newVer + ']');
@@ -2307,7 +2303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
+ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
"Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..4de465c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
- public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5;
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
@@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx) throws IgniteCheckedException {
if (col == null)
return;
@@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx,
ClassLoader ldr)
throws IgniteCheckedException
@@ -701,6 +701,17 @@ public abstract class GridCacheMessage implements Message {
return reader.afterMessageRead(GridCacheMessage.class);
}
+ /**
+ * @param str Bulder.
+ * @param name Flag name.
+ */
+ protected final void appendFlag(StringBuilder str, String name) {
+ if (str.length() > 0)
+ str.append('|');
+
+ str.append(name);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheMessage.class, this, "cacheId", cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 4ec13fc..dff2c88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
@@ -105,9 +106,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@GridToStringExclude
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
+ /** */
+ private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
+
/** Pending atomic futures. */
- private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
- new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
/** Pending data streamer futures. */
private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -253,10 +256,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
if (cacheFut.isCancelled() || cacheFut.isDone()) {
- GridCacheVersion futVer = cacheFut.version();
+ Long futId = cacheFut.id();
- if (futVer != null)
- atomicFuts.remove(futVer, cacheFut);
+ if (futId != null)
+ atomicFuts.remove(futId, cacheFut);
}
}
}
@@ -423,14 +426,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param futVer Future ID.
+ * @return ID for atomic cache update future.
+ */
+ public long atomicFutureId() {
+ return atomicFutId.incrementAndGet();
+ }
+
+ /**
+ * @param futId Future ID.
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
- public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
- IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
+ public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+ IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
- assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
+ assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
return onFutureAdded(fut);
}
@@ -443,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Number of pending atomic futures.
+ */
+ public int atomicFuturesCount() {
+ return atomicFuts.size();
+ }
+
+ /**
* @return Collection of pending data streamer futures.
*/
public Collection<DataStreamerFuture> dataStreamerFutures() {
@@ -452,19 +469,19 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Gets future by given future ID.
*
- * @param futVer Future ID.
+ * @param futId Future ID.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) {
- return atomicFuts.get(futVer);
+ @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+ return atomicFuts.get(futId);
}
/**
- * @param futVer Future ID.
+ * @param futId Future ID.
* @return Removed future.
*/
- @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) {
- return atomicFuts.remove(futVer);
+ @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+ return atomicFuts.remove(futId);
}
/**
@@ -481,6 +498,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* @param topVer Topology version.
+ * @return Future.
*/
public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
final DataStreamerFuture fut = new DataStreamerFuture(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f7ac812..c7ac31a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -859,7 +859,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (!ctx.clientNode() && !ctx.isDaemon())
addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
-
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 02c882c..c5d4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -126,12 +126,10 @@ public class GridCacheReturn implements Externalizable, Message {
}
/**
- * Checks if value is not {@code null}.
*
- * @return {@code True} if value is not {@code null}.
*/
- public boolean hasValue() {
- return v != null;
+ public boolean emptyResult() {
+ return !invokeRes && v == null && cacheObj == null && success;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 7145dc2..5265ec9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -41,30 +41,31 @@ public abstract class GridDeferredAckMessageSender {
private GridTimeoutProcessor time;
/** Closure processor. */
- public GridClosureProcessor closure;
+ public GridClosureProcessor c;
/**
* @param time Time.
- * @param closure Closure.
+ * @param c Closure.
*/
public GridDeferredAckMessageSender(GridTimeoutProcessor time,
- GridClosureProcessor closure) {
+ GridClosureProcessor c) {
this.time = time;
- this.closure = closure;
+ this.c = c;
}
/**
- *
+ * @return Timeout.
*/
public abstract int getTimeout();
/**
- *
+ * @return Buffer size.
*/
public abstract int getBufferSize();
/**
- *
+ * @param nodeId Node ID.
+ * @param vers Versions to send.
*/
public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
@@ -151,7 +152,7 @@ public abstract class GridDeferredAckMessageSender {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- closure.runLocalSafe(new Runnable() {
+ c.runLocalSafe(new Runnable() {
@Override public void run() {
writeLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 146e554..4f8570c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -29,7 +31,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
private static final long serialVersionUID = 0L;
/** */
- @GridDirectTransient
private int part = -1;
/**
@@ -42,14 +43,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
/**
* @param val Value.
* @param valBytes Value bytes.
- */
- public KeyCacheObjectImpl(Object val, byte[] valBytes) {
- this(val, valBytes, -1);
- }
-
- /**
- * @param val Value.
- * @param valBytes Value bytes.
* @param part Partition.
*/
public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
@@ -130,7 +123,57 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 1;
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 1:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(KeyCacheObjectImpl.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 1:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 2af822a..6ca15de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -409,6 +410,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Nullable @Override public List<ClusterNode> nodes(int p,
+ AffinityAssignment affAssignment,
+ List<ClusterNode> affNodes) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index bdd84b0..605150a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -170,6 +171,14 @@ public interface GridDhtPartitionTopology {
/**
* @param p Partition ID.
+ * @param affAssignment Assignments.
+ * @param affNodes Node assigned for given partition by affinity.
+ * @return Collection of all nodes responsible for this partition with primary node being first.
+ */
+ @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
+
+ /**
+ * @param p Partition ID.
* @return Collection of all nodes who {@code own} this partition.
*/
public List<ClusterNode> owners(int p);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 49de280..53257d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -828,11 +828,32 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Nullable @Override public List<ClusterNode> nodes(int p,
+ AffinityAssignment affAssignment,
+ List<ClusterNode> affNodes) {
+ return nodes0(p, affAssignment, affNodes);
+ }
+
+ /** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
+ List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
+
+ return nodes != null ? nodes : affNodes;
+ }
+
+ /**
+ * @param p Partition.
+ * @param affAssignment Assignments.
+ * @param affNodes Node assigned for given partition by affinity.
+ * @return Nodes responsible for given partition (primary is first).
+ */
+ @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) {
+ AffinityTopologyVersion topVer = affAssignment.topologyVersion();
+
lock.readLock().lock();
try {
@@ -866,7 +887,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- return nodes != null ? nodes : affNodes;
+ return nodes;
}
finally {
lock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 4cb113e..5ff5aa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -30,10 +31,13 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -41,14 +45,15 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
* DHT atomic cache backup update future.
@@ -74,56 +79,38 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
protected final GridCacheContext cctx;
/** Future version. */
- protected final GridCacheVersion futVer;
-
- /** Completion callback. */
- @GridToStringExclude
- private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ @GridToStringInclude
+ protected final long futId;
/** Update request. */
- protected final GridNearAtomicAbstractUpdateRequest updateReq;
-
- /** Update response. */
- final GridNearAtomicUpdateResponse updateRes;
+ final GridNearAtomicAbstractUpdateRequest updateReq;
/** Mappings. */
- @GridToStringInclude
+ @GridToStringExclude
protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
/** Continuous query closures. */
private Collection<CI1<Boolean>> cntQryClsrs;
- /** */
- private final boolean waitForExchange;
-
/** Response count. */
private volatile int resCnt;
/**
* @param cctx Cache context.
- * @param completionCb Callback to invoke when future is completed.
* @param writeVer Write version.
* @param updateReq Update request.
- * @param updateRes Update response.
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes
+ GridNearAtomicAbstractUpdateRequest updateReq
) {
this.cctx = cctx;
- this.futVer = cctx.isLocalNode(updateRes.nodeId()) ?
- cctx.versions().next(updateReq.topologyVersion()) : // Generate new if request mapped to local.
- updateReq.futureVersion();
this.updateReq = updateReq;
- this.completionCb = completionCb;
- this.updateRes = updateRes;
this.writeVer = writeVer;
- waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+ futId = cctx.mvcc().atomicFutureId();
if (log == null) {
msgLog = cctx.shared().atomicMessageLogger();
@@ -131,8 +118,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
}
+ /**
+ * @return {@code True} if all updates are sent to DHT.
+ */
+ protected abstract boolean sendAllToDht();
+
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ boolean waitForExchange = !updateReq.topologyLocked();
+
if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
return this;
@@ -141,17 +135,23 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/**
* @param clsr Continuous query closure.
+ * @param sync Synchronous continuous query flag.
*/
- public final void addContinuousQueryClosure(CI1<Boolean> clsr) {
+ public final void addContinuousQueryClosure(CI1<Boolean> clsr, boolean sync) {
assert !isDone() : this;
- if (cntQryClsrs == null)
- cntQryClsrs = new ArrayList<>(10);
+ if (sync)
+ clsr.apply(true);
+ else {
+ if (cntQryClsrs == null)
+ cntQryClsrs = new ArrayList<>(10);
- cntQryClsrs.add(clsr);
+ cntQryClsrs.add(clsr);
+ }
}
/**
+ * @param affAssignment Affinity assignment.
* @param entry Entry to map.
* @param val Value to write.
* @param entryProcessor Entry processor.
@@ -163,7 +163,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param updateCntr Partition update counter.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- final void addWriteEntry(GridDhtCacheEntry entry,
+ final void addWriteEntry(
+ AffinityAssignment affAssignment,
+ GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
@@ -174,7 +176,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
long updateCntr) {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+ List<ClusterNode> affNodes = affAssignment.get(entry.partition());
+
+ List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
+
+ if (dhtNodes == null)
+ dhtNodes = affNodes;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -193,8 +200,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (updateReq == null) {
updateReq = createRequest(
- node,
- futVer,
+ node.id(),
+ futId,
writeVer,
syncMode,
topVer,
@@ -212,7 +219,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
conflictExpireTime,
conflictVer,
addPrevVal,
- entry.partition(),
prevVal,
updateCntr);
}
@@ -239,7 +245,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
- final void addNearWriteEntries(Collection<UUID> readers,
+ final void addNearWriteEntries(
+ Collection<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
@@ -262,8 +269,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
continue;
updateReq = createRequest(
- node,
- futVer,
+ node.id(),
+ futId,
writeVer,
syncMode,
topVer,
@@ -274,8 +281,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
mappings.put(nodeId, updateReq);
}
- addNearReaderEntry(entry);
-
updateReq.addNearWriteValue(entry.key(),
val,
entryProcessor,
@@ -284,12 +289,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
}
- /**
- * adds new nearReader.
- *
- * @param entry GridDhtCacheEntry.
- */
- protected abstract void addNearReaderEntry(GridDhtCacheEntry entry);
+ /** {@inheritDoc} */
+ @Override public final IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final Long id() {
+ return futId;
+ }
/**
* @return Write version.
@@ -299,21 +307,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/** {@inheritDoc} */
- @Override public final IgniteUuid futureId() {
- return futVer.asGridUuid();
- }
-
- /** {@inheritDoc} */
- @Override public final GridCacheVersion version() {
- return futVer;
- }
-
- /** {@inheritDoc} */
@Override public final boolean onNodeLeft(UUID nodeId) {
boolean res = registerResponse(nodeId);
if (res && msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+ msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
", node=" + nodeId + ']');
}
@@ -324,7 +322,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param nodeId Node ID.
* @return {@code True} if request found.
*/
- final boolean registerResponse(UUID nodeId) {
+ private boolean registerResponse(UUID nodeId) {
int resCnt0;
GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
@@ -353,41 +351,103 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/**
* Sends requests to remote nodes.
+ *
+ * @param nearNode Near node.
+ * @param ret Cache operation return value.
+ * @param updateRes Response.
+ * @param completionCb Callback to invoke to send response to near node.
+ */
+ final void map(ClusterNode nearNode,
+ GridCacheReturn ret,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+ if (F.isEmpty(mappings)) {
+ updateRes.dhtNodes(Collections.<UUID>emptyList());
+
+ completionCb.apply(updateReq, updateRes);
+
+ onDone();
+
+ return;
+ }
+
+ boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+ !ret.emptyResult() ||
+ updateRes.nearVersion() != null ||
+ cctx.localNodeId().equals(nearNode.id());
+
+ boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+
+ if (needMapping) {
+ initMapping(updateRes);
+
+ needReplyToNear = true;
+ }
+
+ sendDhtRequests(nearNode, ret);
+
+ if (needReplyToNear)
+ completionCb.apply(updateReq, updateRes);
+ }
+
+ /**
+ * @param updateRes Response.
*/
- final void map() {
+ private void initMapping(GridNearAtomicUpdateResponse updateRes) {
+ List<UUID> dhtNodes;
+
if (!F.isEmpty(mappings)) {
- for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
- try {
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ dhtNodes = new ArrayList<>(mappings.size());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, sent request [futId=" + futVer +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
+ dhtNodes.addAll(mappings.keySet());
+ }
+ else
+ dhtNodes = Collections.emptyList();
+
+ updateRes.dhtNodes(dhtNodes);
+ }
+
+ /**
+ * @param nearNode Near node.
+ * @param ret Return value.
+ */
+ private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) {
+ for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
+ try {
+ assert !cctx.localNodeId().equals(req.nodeId()) : req;
+
+ if (updateReq.fullSync()) {
+ req.nearReplyInfo(nearNode.id(), updateReq.futureId());
+
+ if (ret.emptyResult())
+ req.hasResult(true);
}
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
- registerResponse(req.nodeId());
+ if (cntQryClsrs != null)
+ req.replyWithoutDelay(true);
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, sent request [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
}
- catch (IgniteCheckedException ignored) {
- U.error(msgLog, "Failed to send request [futId=" + futVer +
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-
- registerResponse(req.nodeId());
}
+
+ registerResponse(req.nodeId());
}
- }
- else
- onDone();
+ catch (IgniteCheckedException ignored) {
+ U.error(msgLog, "Failed to send request [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- // Send response right away if no ACKs from backup is required.
- // Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (updateReq.writeSynchronizationMode() != FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
+ registerResponse(req.nodeId());
+ }
+ }
}
/**
@@ -395,7 +455,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*
* @param nodeId Backup node ID.
*/
- public final void onResult(UUID nodeId) {
+ final void onDeferredResponse(UUID nodeId) {
if (log.isDebugEnabled())
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
@@ -403,8 +463,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
- * @param node Node.
- * @param futVer Future version.
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+ if (!F.isEmpty(res.nearEvicted())) {
+ for (KeyCacheObject key : res.nearEvicted()) {
+ try {
+ GridDhtCacheEntry entry = (GridDhtCacheEntry)cctx.cache().peekEx(key);
+
+ if (entry != null)
+ entry.removeReader(nodeId, res.messageId());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']');
+ }
+ }
+ }
+
+ registerResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
* @param topVer Topology version.
@@ -414,8 +497,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @return Request.
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
- GridCacheVersion futVer,
+ UUID nodeId,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -424,38 +507,18 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
@Nullable GridCacheVersion conflictVer
);
- /**
- * Callback for backup update response.
- *
- * @param nodeId Backup node ID.
- * @param updateRes Update response.
- */
- public abstract void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes);
-
- /**
- * @param updateRes Response.
- * @param err Error.
- */
- protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
-
/** {@inheritDoc} */
@Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
- cctx.mvcc().removeAtomicFuture(version());
+ cctx.mvcc().removeAtomicFuture(futId);
boolean suc = err == null;
- if (!suc)
- addFailedKeys(updateRes, err);
-
if (cntQryClsrs != null) {
for (CI1<Boolean> clsr : cntQryClsrs)
clsr.apply(suc);
}
- if (updateReq.writeSynchronizationMode() == FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
-
return true;
}
@@ -471,4 +534,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
@Override public void markNotTrackable() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ synchronized (this) {
+ Map<UUID, String> dhtRes = F.viewReadOnly(mappings,
+ new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>() {
+ @Override public String apply(GridDhtAtomicAbstractUpdateRequest req) {
+ return "[res=" + req.hasResponse() +
+ ", size=" + req.size() +
+ ", nearSize=" + req.nearSize() + ']';
+ }
+ }
+ );
+
+ return S.toString(GridDhtAtomicAbstractUpdateFuture.class, this, "dhtRes", dhtRes);
+ }
+ }
}