You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:47 UTC
[02/10] ignite git commit: ignite-4680-2
ignite-4680-2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1b4ebd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1b4ebd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1b4ebd4
Branch: refs/heads/ignite-4680-sb
Commit: d1b4ebd48290af5fb3a2d0d9df57219dd4ce1edb
Parents: 8e5e3cb
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Wed Mar 15 19:41:49 2017 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Wed Mar 15 19:41:49 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../ignite/internal/IgniteNodeAttributes.java | 3 +
.../managers/communication/GridIoManager.java | 23 +-
.../cache/GridCacheAffinityManager.java | 21 +-
.../processors/cache/GridCacheAtomicFuture.java | 2 +-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 6 +-
.../cache/GridCacheManagerAdapter.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 14 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 184 ++++++++---
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 +-
.../GridNearAtomicAbstractUpdateFuture.java | 102 +++++-
.../GridNearAtomicAbstractUpdateRequest.java | 46 +--
.../atomic/GridNearAtomicFullUpdateRequest.java | 127 ++++++-
.../GridNearAtomicSingleUpdateFuture.java | 25 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 72 ++--
.../atomic/GridNearAtomicUpdateResponse.java | 40 ++-
.../distributed/near/GridNearAtomicCache.java | 27 +-
.../cache/version/GridCacheVersionManager.java | 47 +++
.../apache/ignite/internal/util/MPSCQueue.java | 331 +++++++++++++++++++
.../ignite/internal/util/StripedExecutor.java | 125 ++++++-
.../ignite/thread/IgniteStripeThread.java | 47 +++
.../GridNearAtomicFullUpdateRequestTest.java | 74 +++++
.../dht/atomic/MarshallingAbstractTest.java | 156 +++++++++
.../version/GridCacheVersionManagerTest.java | 86 +++++
26 files changed, 1380 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..d37e4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -227,6 +227,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
@@ -1167,7 +1168,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
" ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
- " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
+ " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]" + NL +
+ " ^-- Striped executor [" + ctx.getStripedExecutorService().getMetrics() + "]";
log.info(msg);
}
@@ -1440,6 +1442,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
+ add(ATTR_STRIPES_CNT, ctx.getStripedExecutorService() != null ? ctx.getStripedExecutorService().stripes() : -1);
+
if (cfg.getConsistentId() != null)
add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 8294026..f1a2558 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -168,6 +168,9 @@ public final class IgniteNodeAttributes {
/** Ignite services compatibility mode (can be {@code null}). */
public static final String ATTR_SERVICES_COMPATIBILITY_MODE = ATTR_PREFIX + ".services.compatibility.enabled";
+ /** Ignite cache stripe size. */
+ public static final String ATTR_STRIPES_CNT = ATTR_PREFIX + ".cache.stripes.cnt";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..5fd2845 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -97,9 +99,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -807,19 +809,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
};
+ StripedExecutor stripedExecutor = ctx.getStripedExecutorService();
+
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
if (msg0.processFromNioThread())
c.run();
else
- ctx.getStripedExecutorService().execute(-1, c);
+ stripedExecutor.execute(-1, c);
return;
}
- if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
- ctx.getStripedExecutorService().execute(msg.partition(), c);
+ if (plc == GridIoPolicy.SYSTEM_POOL &&
+ (msg.partition() != Integer.MIN_VALUE ||
+ msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
+ Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+ ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+
+ if (stripemap != null) {
+ for (Integer stripe : stripemap.keySet()) {
+ stripedExecutor.execute(stripe, c);
+ }
+ }
+ else
+ stripedExecutor.execute(msg.partition(), c);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 17c9319..621634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
@@ -33,12 +38,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
/**
* Cache affinity manager.
*/
@@ -88,10 +87,15 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
*
*/
public void cancelFutures() {
+ if (!starting.get())
+ // Ignoring attempt to stop manager that has never been started.
+ return;
+
IgniteCheckedException err =
new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
- aff.cancelFutures(err);
+ if (aff != null)
+ aff.cancelFutures(err);
}
/** {@inheritDoc} */
@@ -99,7 +103,8 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Failed to wait for topology update, client disconnected.");
- aff.cancelFutures(err);
+ if (aff != null)
+ aff.cancelFutures(err);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
* @return Future ID.
*/
- public Long id();
+ public long id();
/**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 06c707e..e76b773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1966,7 +1966,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (lock.readLock().tryLock()) {
try {
if (log.isDebugEnabled())
- log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId +
+ log.debug("Entered to eviction future storeResponse() [fut=" + this + ", node=" + nodeId +
", res=" + res + ']');
ClusterNode node = cctx.node(nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 99878ec..3cf68d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -431,7 +431,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
append(", dhtTxId=").append(dhtTxId(cacheMsg)).
append(", msg=").append(cacheMsg);
}
- else if (atomicFututeId(cacheMsg) != null) {
+ else if (atomicFututeId(cacheMsg) > -1) {
builder.append("futId=").append(atomicFututeId(cacheMsg)).
append(", writeVer=").append(atomicWriteVersion(cacheMsg)).
append(", msg=").append(cacheMsg);
@@ -484,7 +484,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
- @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
+ @Nullable private long atomicFututeId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
@@ -496,7 +496,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
- return null;
+ return -1;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
index 8ad0ea8..ab965de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
@@ -34,7 +34,7 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> {
protected IgniteLogger log;
/** Starting flag. */
- private final AtomicBoolean starting = new AtomicBoolean(false);
+ protected final AtomicBoolean starting = new AtomicBoolean(false);
/** {@inheritDoc} */
@Override public final void start(GridCacheContext<K, V> cctx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..c4cbefd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -132,7 +132,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
};
/** Logger. */
- @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
/** */
@@ -256,9 +256,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
if (cacheFut.isCancelled() || cacheFut.isDone()) {
- Long futId = cacheFut.id();
+ long futId = cacheFut.id();
- if (futId != null)
+ if (futId > -1)
atomicFuts.remove(futId, cacheFut);
}
}
@@ -437,7 +437,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
- public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+ public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +472,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
return atomicFuts.get(futId);
}
@@ -480,7 +480,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Removed future.
*/
- @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
return atomicFuts.remove(futId);
}
@@ -511,8 +511,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
-
- /**
* Adds future.
*
* @param fut Future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..17ee298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/** {@inheritDoc} */
- @Override public final Long id() {
+ public final long id() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..4ab5000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -101,6 +101,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteStripeThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -278,9 +279,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req
) {
+ int stripeIdx;
+
+ if (req instanceof GridNearAtomicFullUpdateRequest && Thread.currentThread() instanceof IgniteStripeThread)
+ stripeIdx = ((IgniteStripeThread)Thread.currentThread()).stripeIndex();
+ else
+ stripeIdx = IgniteStripeThread.GRP_IDX_UNASSIGNED;
+
processNearAtomicUpdateRequest(
nodeId,
- req);
+ req,
+ stripeIdx);
}
@Override public String toString() {
@@ -1640,6 +1649,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Executes local update after preloader fetched values.
+ *
+ * @param nodeId Node ID.
+ * @param req Update request.
+ * @param completionCb Completion callback.
+ */
+ public void updateAllAsyncInternal(
+ final UUID nodeId,
+ final GridNearAtomicAbstractUpdateRequest req,
+ final UpdateReplyClosure completionCb
+ ) {
+ updateAllAsyncInternal(nodeId, req, IgniteStripeThread.GRP_IDX_UNASSIGNED, completionCb);
+ }
+
+ /**
* Executes local update.
*
* @param nodeId Node ID.
@@ -1649,6 +1673,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
+ final int stripeIdx,
final UpdateReplyClosure completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1667,7 +1692,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, completionCb);
+ updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
}
else {
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1684,7 +1709,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- updateAllAsyncInternal0(nodeId, req, completionCb);
+ updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
}
});
}
@@ -1718,11 +1743,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param nodeId Node ID.
* @param req Update request.
+ * @param stripeIdx Stripe index.
* @param completionCb Completion callback.
*/
private void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req,
+ int stripeIdx,
UpdateReplyClosure completionCb
) {
ClusterNode node = ctx.discovery().node(nodeId);
@@ -1741,6 +1768,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
ctx.deploymentEnabled());
+ res.partition(req.partition());
+
+ int[] stripeIdxs = null;
+
+ if (stripeIdx != IgniteStripeThread.GRP_IDX_UNASSIGNED
+ && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
+ && ((GridNearAtomicFullUpdateRequest)req).stripeMap() != null) {
+ stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(stripeIdx);
+
+ res.stripe(stripeIdx);
+ }
+
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1765,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (top.stopping()) {
- res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+ addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
"(cache is stopped): " + name()));
completionCb.apply(req, res);
@@ -1776,7 +1815,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
- locked = lockEntries(req, req.topologyVersion());
+ locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
@@ -1795,13 +1834,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req);
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+ dhtFut = createDhtFuture(ver, req, size);
expiry = expiryPolicy(req.expiry());
GridCacheReturn retVal = null;
- if (req.size() > 1 && // Several keys ...
+ if (size > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
// (conflict resolver should be used for local store)
@@ -1818,7 +1859,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
@@ -1837,7 +1879,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.isDrEnabled(),
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1895,7 +1938,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// an attempt to use cleaned resources.
U.error(log, "Unexpected exception during cache update", e);
- res.addFailedKeys(req.keys(), e);
+ addAllKeysAsFailed(req, res, stripeIdxs, e);
completionCb.apply(req, res);
@@ -1921,6 +1964,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Adds all keys as failed to response.
+ *
+ * @param req Request.
+ * @param res Response.
+ * @param stripeIdx Stripe indexes.
+ * @param e Throwable.
+ */
+ private void addAllKeysAsFailed(GridNearAtomicAbstractUpdateRequest req,
+ GridNearAtomicUpdateResponse res,
+ int[] stripeIdx,
+ Throwable e) {
+
+ if (stripeIdx == null)
+ res.addFailedKeys(req.keys(), e);
+ else {
+ for (int i = 0; i < stripeIdx.length; i++)
+ res.addFailedKey(req.key(stripeIdx[i]), e);
+ }
+ }
+
+ /**
* Updates locked entries using batched write-through.
*
* @param node Sender node.
@@ -1934,6 +1998,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
+ * @param stripeIdxs Stripe indexes.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1949,7 +2014,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
- final boolean sndPrevVal
+ final boolean sndPrevVal,
+ final int[] stripeIdxs
) throws GridCacheEntryRemovedException {
assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1959,13 +2025,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
reloadIfNeeded(locked);
}
catch (IgniteCheckedException e) {
- res.addFailedKeys(req.keys(), e);
+ addAllKeysAsFailed(req, res, stripeIdxs, e);
return new UpdateBatchResult();
}
}
- int size = req.size();
+ int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
Map<KeyCacheObject, CacheObject> putMap = null;
@@ -1990,6 +2056,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
try {
if (!checkFilter(entry, req, res)) {
if (expiry != null && entry.hasValue()) {
@@ -2017,7 +2085,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (op == TRANSFORM) {
- EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(trueIdx);
CacheObject old = entry.innerGet(
ver,
@@ -2099,7 +2167,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
firstEntryIdx = i;
@@ -2147,7 +2216,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
firstEntryIdx = i;
@@ -2172,12 +2242,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessorMap.put(entry.key(), entryProcessor);
}
else if (op == UPDATE) {
- CacheObject updated = req.value(i);
+ CacheObject updated = req.value(trueIdx);
if (intercept) {
CacheObject old = entry.innerGet(
- null,
- null,
+ null,
+ null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
/*metrics*/true,
@@ -2273,7 +2343,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes,
taskName,
expiry,
- sndPrevVal);
+ sndPrevVal,
+ stripeIdxs);
}
else
assert filtered.isEmpty();
@@ -2350,6 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param taskName Task name.
* @param expiry Expiry policy.
* @param sndPrevVal If {@code true} sends previous value to backups.
+ * @param stripeIdxs Stripe indexes.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -2364,7 +2436,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean sndPrevVal
+ boolean sndPrevVal,
+ int[] stripeIdxs
) throws GridCacheEntryRemovedException {
GridCacheReturn retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -2377,9 +2450,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+ int keyNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
// Avoid iterator creation.
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject k = req.key(i);
+ for (int i = 0; i < keyNum; i++) {
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ KeyCacheObject k = req.key(trueIdx);
GridCacheOperation op = req.operation();
@@ -2388,13 +2465,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = locked.get(i);
- GridCacheVersion newConflictVer = req.conflictVersion(i);
- long newConflictTtl = req.conflictTtl(i);
- long newConflictExpireTime = req.conflictExpireTime(i);
+ GridCacheVersion newConflictVer = req.conflictVersion(trueIdx);
+ long newConflictTtl = req.conflictTtl(trueIdx);
+ long newConflictExpireTime = req.conflictExpireTime(trueIdx);
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
+ Object writeVal = op == TRANSFORM ? req.entryProcessor(trueIdx) : req.writeValue(trueIdx);
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
@@ -2478,13 +2555,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
- res.addNearValue(i,
+ res.addNearValue(trueIdx,
updRes.newValue(),
updRes.newTtl(),
updRes.conflictExpireTime());
}
else
- res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
+ res.addNearTtl(trueIdx, updRes.newTtl(), updRes.conflictExpireTime());
if (updRes.newValue() != null) {
IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2495,10 +2572,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
entry.removeReader(nearNode.id(), req.messageId());
else
- res.addSkippedIndex(i);
+ res.addSkippedIndex(trueIdx);
}
else
- res.addSkippedIndex(i);
+ res.addSkippedIndex(trueIdx);
}
if (updRes.removeVersion() != null) {
@@ -2548,7 +2625,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param hasNear {@code True} if originating node has near cache.
- * @param firstEntryIdx Index of the first entry in the request keys collection.
+ * @param firstEntryIdx Index of the first entry in the request keys collection or in the stripeIdxs.
* @param entries Entries to update.
* @param ver Version to set.
* @param nearNode Originating node.
@@ -2584,7 +2661,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final UpdateBatchResult batchRes,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
- final boolean sndPrevVal
+ final boolean sndPrevVal,
+ final int[] stripeIdxs
) {
assert putMap == null ^ rmvKeys == null;
@@ -2737,17 +2815,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
+ // it's index inside all keys
+ int trueIdx = stripeIdxs == null ? firstEntryIdx + i : stripeIdxs[firstEntryIdx + i];
+
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
int idx = firstEntryIdx + i;
if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
+ res.addNearValue(trueIdx,
writeVal,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
}
else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+ res.addNearTtl(trueIdx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
if (writeVal != null || entry.hasValue()) {
IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
@@ -2758,7 +2839,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else if (readers.contains(nearNode.id())) // Reader became primary or backup.
entry.removeReader(nearNode.id(), req.messageId());
else
- res.addSkippedIndex(firstEntryIdx + i);
+ res.addSkippedIndex(trueIdx);
}
}
catch (GridCacheEntryRemovedException e) {
@@ -2789,12 +2870,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param req Request with keys to lock.
* @param topVer Topology version to lock on.
+ * @param stripeIdxs Stripe indexes.
* @return Collection of locked entries.
* @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown,
* locks are released.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
+ private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req,
+ AffinityTopologyVersion topVer,
+ int[] stripeIdxs)
throws GridDhtInvalidPartitionException {
if (req.size() == 1) {
KeyCacheObject key = req.key(0);
@@ -2811,11 +2895,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
- List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+ int keysNum = stripeIdxs == null ? req.size() : stripeIdxs.length;
+ List<GridDhtCacheEntry> locked = new ArrayList<>(keysNum);
while (true) {
- for (int i = 0; i < req.size(); i++) {
- GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+ for (int i = 0; i < keysNum; i++) {
+ int idx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ GridDhtCacheEntry entry = entryExx(req.key(idx), topVer);
locked.add(entry);
}
@@ -3003,25 +3090,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ int size
) {
- if (updateReq.size() == 1)
+ if (size == 1)
return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
else
- return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, size);
}
/**
* @param nodeId Sender node ID.
+ * @param stripeIdx Stripe number.
* @param req Near atomic update request.
*/
- private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req, int stripeIdx) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
- ", node=" + nodeId + ']');
+ ", node=" + nodeId + ", stripe=" + stripeIdx + ']');
}
- updateAllAsyncInternal(nodeId, req, updateReplyClos);
+ updateAllAsyncInternal(nodeId, req, stripeIdx, updateReplyClos);
}
/**
@@ -3031,7 +3120,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response " +
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() + ']');
res.nodeId(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..af9fd8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,11 +49,12 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ int size
) {
super(cctx, writeVer, updateReq);
- mappings = U.newHashMap(updateReq.size());
+ mappings = U.newHashMap(size);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..770999a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -138,7 +139,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Future ID. */
@GridToStringInclude
- protected Long futId;
+ protected long futId = -1;
/** Operation result. */
protected GridCacheReturn opRes;
@@ -356,7 +357,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return Response to notify about primary failure.
*/
final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
- assert req.response() == null : req;
assert req.nodeId() != null : req;
if (msgLog.isDebugEnabled()) {
@@ -423,6 +423,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
final GridNearAtomicAbstractUpdateRequest req;
/** */
+ private GridNearAtomicUpdateResponse res;
+
+ /** */
+ private Map<Integer, GridNearAtomicUpdateResponse> resMap;
+
+ /** */
@GridToStringInclude
Set<UUID> dhtNodes;
@@ -431,7 +437,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
private Set<UUID> rcvd;
/** */
- private boolean hasRes;
+ private boolean hasDhtRes;
/**
* @param req Request.
@@ -516,13 +522,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
+ * @return {@code True} if all near results gathered.
+ */
+ private boolean hasAllNearResults() {
+ return res != null || (resMap != null && resMap.size() == req.stripes());
+ }
+
+ public String state() {
+ return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res" : "no";
+ }
+
+ /**
* @return {@code True} if all expected responses are received.
*/
private boolean finished() {
if (req.writeSynchronizationMode() == PRIMARY_SYNC)
- return hasRes;
+ return hasAllNearResults();
- return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+ return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
}
/**
@@ -536,13 +553,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* When primary failed, even if primary response is received, it is possible it failed to send
* request to backup(s), need remap operation.
*/
- if (req.fullSync() && !req.nodeFailedResponse()) {
- req.resetResponse();
+ if (req.fullSync() && !nodeFailedResponse()) {
+ resetResponse();
return req;
}
- return req.response() == null ? req : null;
+ return hasAllNearResults() ? req : null;
}
/**
@@ -559,7 +576,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (finished())
return null;
- return req.response() == null ? req : null;
+ return !hasAllNearResults() ? req : null;
}
/**
@@ -571,7 +588,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return DhtLeftResult.NOT_DONE;
if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
- if (hasRes)
+ if (hasDhtRes)
return DhtLeftResult.DONE;
else
return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
@@ -592,7 +609,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
return false;
if (res.hasResult())
- hasRes = true;
+ hasDhtRes = true;
if (dhtNodes == null) {
if (rcvd == null)
@@ -614,9 +631,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
assert !finished() : this;
- hasRes = true;
-
- boolean onRes = req.onResponse(res);
+ boolean onRes = storeResponse(res);
assert onRes;
@@ -639,7 +654,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param cctx Context.
*/
private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
- assert dhtNodes == null || req.initMappingLocally();
+ assert F.isEmpty(dhtNodes) || req.initMappingLocally();
Set<UUID> dhtNodes0 = dhtNodes;
@@ -664,12 +679,67 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
dhtNodes = Collections.emptySet();
}
+ /**
+ * @return {@code True} if received notification about primary fail.
+ */
+ boolean nodeFailedResponse() {
+ return res != null && res.nodeLeftResponse();
+ }
+
+ /**
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
+ */
+ private boolean storeResponse(GridNearAtomicUpdateResponse res) {
+ if (res.stripe() > -1) {
+ if (resMap == null)
+ resMap = U.newHashMap(req.stripes());
+
+ if (!resMap.containsKey(res.stripe())) {
+ resMap.put(res.stripe(), res);
+ return true;
+ }
+ }
+ else {
+ if (this.res == null) {
+ this.res = res;
+// this.resMap = null;
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ void resetResponse() {
+ res = null;
+ resMap = null;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
+ return resMap;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PrimaryRequestState.class, this,
"primary", primaryId(),
"needPrimaryRes", req.needPrimaryResponse(),
- "primaryRes", req.response() != null,
+ "primaryRes", this.res != null,
"done", finished());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..b549370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,10 +89,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridToStringExclude
protected byte flags;
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
/**
*
*/
@@ -249,41 +245,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /**
- *
- */
- void resetResponse() {
- this.res = null;
- }
-
- /**
- * @return Response.
- */
- @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /**
- * @return {@code True} if received notification about primary fail.
- */
- boolean nodeFailedResponse() {
- return res != null && res.nodeLeftResponse();
- }
-
- /**
* @return Topology locked flag.
*/
final boolean topologyLocked() {
@@ -451,6 +412,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
public abstract int size();
/**
+ * @return Number of stripes.
+ */
+ public int stripes() {
+ return 0;
+ }
+
+ /**
* @param idx Key index.
* @return Key.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..5a8c66b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,13 +21,16 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -40,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,6 +63,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
+ public static final int DIRECT_TYPE = 40;
+
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -70,6 +74,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ /** Stripe to index mapping. */
+ @GridDirectTransient
+ private int[] stripeCnt;
+
+ /** Stripe to index mapping bytes. */
+ @GridDirectMap(keyType = int.class, valueType = int[].class)
+ private Map<Integer, int[]> stripeMap;
+
/** Entry processors. */
@GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -109,6 +121,17 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectTransient
private int initSize;
+ /** Maximum number of keys. */
+ @GridDirectTransient
+ private int maxEntryCnt;
+
+ /** Number of stripes on remote node. */
+ @GridDirectTransient
+ private int maxStripes;
+
+ /** Partition Id */
+ private int partId;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -155,7 +178,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
boolean skipStore,
boolean keepBinary,
boolean addDepInfo,
- int maxEntryCnt
+ int maxEntryCnt,
+ int maxStripes
) {
super(cacheId,
nodeId,
@@ -181,6 +205,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
// than 10, we use it.
initSize = Math.min(maxEntryCnt, 10);
+ this.maxEntryCnt = maxEntryCnt;
+ this.maxStripes = maxStripes;
+
keys = new ArrayList<>(initSize);
}
@@ -200,6 +227,25 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
assert val != null || op == DELETE;
+ if (maxStripes > 1) {
+ int stripe = key.partition() % maxStripes;
+
+ if (stripeCnt == null)
+ stripeCnt = new int[maxStripes];
+
+ if (stripeMap == null)
+ stripeMap = new HashMap<>(maxStripes);
+
+ int[] idxs = stripeMap.get(stripe);
+
+ if (idxs == null)
+ stripeMap.put(stripe, idxs = new int[maxEntryCnt]);
+
+ int idx = stripeCnt[stripe];
+ idxs[idx] = keys.size();
+ stripeCnt[stripe] = idx + 1;
+ }
+
keys.add(key);
if (entryProcessor != null) {
@@ -267,6 +313,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public int stripes() {
+ return stripeMap.size();
+ }
+
+ /** {@inheritDoc} */
@Override public KeyCacheObject key(int idx) {
return keys.get(idx);
}
@@ -353,6 +404,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return expiryPlc;
}
+ /**
+ * @return Stripe mapping.
+ */
+ @Nullable public Map<Integer, int[]> stripeMap() {
+ return stripeMap;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -362,6 +420,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
if (expiryPlc != null && expiryPlcBytes == null)
expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ if (stripeMap != null && stripeCnt != null) {
+ for (Integer idx : stripeMap.keySet()) {
+ stripeMap.put(idx, Arrays.copyOf(stripeMap.get(idx), stripeCnt[idx]));
+ }
+
+ stripeCnt = null;
+ }
+
prepareMarshalCacheObjects(keys, cctx);
if (filter != null) {
@@ -425,9 +491,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int partition() {
- assert !F.isEmpty(keys);
+ return partId;
+ }
- return keys.get(0).partition();
+ /**
+ * @param partId Partition.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
}
/** {@inheritDoc} */
@@ -494,6 +565,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
writer.incrementState();
case 18:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMap("stripeMap", stripeMap, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -580,6 +663,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 18:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ stripeMap = reader.readMap("stripeMap", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -594,24 +693,24 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public void cleanup(boolean clearKeys) {
- vals = null;
- entryProcessors = null;
- entryProcessorsBytes = null;
- invokeArgs = null;
- invokeArgsBytes = null;
-
- if (clearKeys)
- keys = null;
+// vals = null;
+// entryProcessors = null;
+// entryProcessorsBytes = null;
+// invokeArgs = null;
+// invokeArgsBytes = null;
+//
+// if (clearKeys)
+// keys = null;
}
/** {@inheritDoc} */
@Override public byte directType() {
- return 40;
+ return DIRECT_TYPE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..2b6d2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
CachePartialUpdateCheckedException err0 = null;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return Non-null topology version if update should be remapped.
*/
private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert futId != -1;
AffinityTopologyVersion remapTopVer0 = null;
@@ -362,7 +362,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.mvcc().removeAtomicFuture(futId);
reqState = null;
- futId = null;
+ futId = -1;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -488,7 +488,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
reqState0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == -1 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -530,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @param futId
* @return
*/
- private boolean checkDhtNodes(Long futId) {
+ private boolean checkDhtNodes(long futId) {
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
AffinityTopologyVersion remapTopVer0 = null;
@@ -538,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicCheckUpdateRequest checkReq = null;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == -1 || this.futId != futId)
return false;
assert reqState != null;
@@ -568,13 +568,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/**
* @return Future ID.
*/
- private Long onFutureDone() {
+ private long onFutureDone() {
Long id0;
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = -1;
}
return id0;
@@ -694,6 +694,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
skipStore,
keepBinary,
cctx.deploymentEnabled(),
+ 1,
1);
}
@@ -721,9 +722,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
if (nearEnabled) {
- assert reqState.req.response() != null;
+ assert reqState.response() != null;
- updateNear(reqState.req, reqState.req.response());
+ updateNear(reqState.req, reqState.response());
}
onDone(opRes, err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..9c4de66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -57,6 +57,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPES_CNT;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
@@ -150,7 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -167,7 +168,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
synchronized (mux) {
- if (futId == null)
+ if (futId == -1)
return false;
if (singleReq != null) {
@@ -282,10 +283,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- Long futId = onFutureDone();
+ long futVer = onFutureDone();
- if (futId != null)
- cctx.mvcc().removeAtomicFuture(futId);
+ if (futVer > -1)
+ cctx.mvcc().removeAtomicFuture(futVer);
return true;
}
@@ -300,7 +301,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
PrimaryRequestState reqState;
@@ -370,10 +371,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- boolean rcvAll;
+ boolean rcvAll = false;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == -1 || futId != res.futureId())
return;
if (singleReq != null) {
@@ -408,6 +409,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
rcvAll = false;
}
+ if (msgLog.isDebugEnabled())
+ msgLog.debug("Processed near atomic update response " +
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe=" + res.stripe() +
+ ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done" : "") +
+ " " + reqState.state() + ']');
}
else
return;
@@ -466,11 +472,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
for (PrimaryRequestState reqState : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = reqState.req.response();
+ if (reqState.responses() != null) {
+ for (GridNearAtomicUpdateResponse res0 : reqState.responses().values()) {
+ updateNear(reqState.req, res0);
+ }
+ }
+ else {
+ GridNearAtomicUpdateResponse res0 = reqState.response();
- assert res0 != null : reqState;
+ assert res0 != null : reqState;
- updateNear(reqState.req, res0);
+ updateNear(reqState.req, res0);
+ }
}
}
else if (!nodeErr)
@@ -534,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Non null topology version if update should be remapped.
*/
@Nullable private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert futId != -1;
AffinityTopologyVersion remapTopVer0 = null;
@@ -577,7 +590,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (remapTopVer0 != null) {
cctx.mvcc().removeAtomicFuture(futId);
- futId = null;
+ futId = -1;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -596,7 +609,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (nearEnabled) {
if (mappings != null) {
for (PrimaryRequestState reqState : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = reqState.req.response();
+ GridNearAtomicUpdateResponse res0 = reqState.response();
assert res0 != null : reqState;
@@ -604,9 +617,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
else {
- assert singleReq != null && singleReq.req.response() != null;
+ assert singleReq != null && singleReq.response() != null;
- updateNear(singleReq.req, singleReq.req.response());
+ updateNear(singleReq.req, singleReq.response());
}
}
@@ -767,7 +780,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- Long futId = cctx.mvcc().atomicFutureId();
+ long futId = cctx.mvcc().atomicFutureId();
Exception err = null;
PrimaryRequestState singleReq0 = null;
@@ -801,7 +814,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == -1 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -866,7 +879,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == -1 || this.futId != futId)
return;
if (singleReq != null) {
@@ -934,13 +947,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @return Future version.
*/
- private Long onFutureDone() {
- Long id0;
+ private long onFutureDone() {
+ long id0;
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = -1;
}
return id0;
@@ -957,7 +970,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@SuppressWarnings("ForLoopReplaceableByForEach")
private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
- Long futId,
+ long futId,
@Nullable Collection<KeyCacheObject> remapKeys,
boolean mappingKnown) throws Exception {
Iterator<?> it = null;
@@ -977,6 +990,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
+ int part = (int)(futId & 0xFFFF);
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null)
@@ -1045,6 +1060,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
PrimaryRequestState mapped = pendingMappings.get(nodeId);
if (mapped == null) {
+ int stripes = primary.attribute(ATTR_STRIPES_CNT) != null ? (int)primary.attribute(ATTR_STRIPES_CNT) : -1;
+
GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
nodeId,
@@ -1063,7 +1080,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
skipStore,
keepBinary,
cctx.deploymentEnabled(),
- keys.size());
+ keys.size(),
+ stripes
+ );
+
+ req.partition(part);
mapped = new PrimaryRequestState(req, nodes, false);
@@ -1086,7 +1107,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Request.
* @throws Exception If failed.
*/
- private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId, boolean mappingKnown)
throws Exception {
Object key = F.first(keys);
@@ -1168,6 +1189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
skipStore,
keepBinary,
cctx.deploymentEnabled(),
+ 1,
1);
req.addUpdateEntry(cacheKey,
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 4e20fc7..f8b45d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -96,6 +96,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Partition ID. */
private int partId = -1;
+ /** Stripe. */
+ private int stripe = -1;
+
/** */
@GridDirectCollection(UUID.class)
@GridToStringInclude
@@ -182,6 +185,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @param partId Partition ID for proper striping on near node.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
+ /**
* Sets update error.
*
* @param err Error.
@@ -427,6 +437,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
return partId;
}
+ /**
+ * @return Stripe number.
+ */
+ public int stripe() {
+ return stripe;
+ }
+
+ /**
+ * @param stripe Stripe number.
+ */
+ public void stripe(int stripe) {
+ this.stripe = stripe;
+ }
+
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
@@ -524,6 +548,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
+ case 15:
+ if (!writer.writeInt("stripe", stripe))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -636,6 +666,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
+ case 15:
+ stripe = reader.readInt("stripe");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -648,7 +686,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..5844021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -130,7 +131,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
- if (F.size(res.failedKeys()) == req.size())
+ int keyNum;
+ int[] stripeIdxs;
+
+ if (res.stripe() > -1 && req instanceof GridNearAtomicFullUpdateRequest) {
+ stripeIdxs = ((GridNearAtomicFullUpdateRequest)req).stripeMap().get(res.stripe());
+ keyNum = stripeIdxs.length;
+ }
+ else {
+ stripeIdxs = null;
+ keyNum = req.keys().size();
+ }
+
+ if (F.size(res.failedKeys()) == keyNum)
return;
/*
@@ -150,11 +163,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- for (int i = 0; i < req.size(); i++) {
- if (F.contains(skipped, i))
+ for (int i = 0; i < keyNum; i++) {
+ int trueIdx = stripeIdxs == null ? i : stripeIdxs[i];
+
+ if (F.contains(skipped, trueIdx))
continue;
- KeyCacheObject key = req.key(i);
+ KeyCacheObject key = req.key(trueIdx);
if (F.contains(failed, key))
continue;
@@ -170,7 +185,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
CacheObject val = null;
- if (F.contains(nearValsIdxs, i)) {
+ if (F.contains(nearValsIdxs, trueIdx)) {
val = res.nearValue(nearValIdx);
nearValIdx++;
@@ -179,7 +194,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
assert req.operation() != TRANSFORM;
if (req.operation() != DELETE)
- val = req.value(i);
+ val = req.value(trueIdx);
}
long ttl = res.nearTtl(i);