You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:35 UTC
[45/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to
ignite-3477
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ef8150c,c20ed48..bc1c584
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -38,7 -38,7 +38,8 @@@ import org.apache.ignite.cluster.Cluste
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.pagemem.wal.StorageException;
+ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@@ -2699,24 -2600,12 +2618,15 @@@ public class GridDhtAtomicCache<K, V> e
GridCacheOperation op;
if (putMap != null) {
- // If fast mapping, filter primary keys for write to store.
- Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
- F.view(putMap, new P1<CacheObject>() {
- @Override public boolean apply(CacheObject key) {
- return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
- }
- }) :
- putMap;
-
try {
- Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(storeMap,
- ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
- @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
- return F.t(v, ver);
- }
- }));
++ Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap,
+ new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() {
+ @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) {
+ return F.t(val, ver);
+ }
+ });
+
+ ctx.store().putAll(null, view);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
@@@ -3212,11 -3028,10 +3054,10 @@@
* @param nodeId Sender node ID.
* @param res Near atomic update response.
*/
- @SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response " +
- "[futId=" + res.futureVersion() +
- msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
++ msgLog.debug("Received near atomic update response [futId" + res.futureId() +
+ ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
@@@ -3254,126 -3100,209 +3126,235 @@@
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
+ ctx.shared().database().checkpointReadLock();
- try {
- while (true) {
- GridDhtCacheEntry entry = null;
+ try {
+ for (int i = 0; i < req.size(); i++) {
+ KeyCacheObject key = req.key(i);
- try {
- entry = entryExx(key);
+ try {
+ while (true) {
+ GridDhtCacheEntry entry = null;
- CacheObject val = req.value(i);
- CacheObject prevVal = req.previousValue(i);
+ try {
+ entry = entryExx(key);
- EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
- Long updateIdx = req.updateCounter(i);
+ CacheObject val = req.value(i);
+ CacheObject prevVal = req.previousValue(i);
- GridCacheOperation op = entryProcessor != null ? TRANSFORM :
- (val != null) ? UPDATE : DELETE;
+ EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+ Long updateIdx = req.updateCounter(i);
- long ttl = req.ttl(i);
- long expireTime = req.conflictExpireTime(i);
+ GridCacheOperation op = entryProcessor != null ? TRANSFORM :
+ (val != null) ? UPDATE : DELETE;
- GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
- ver,
- nodeId,
- nodeId,
- op,
- op == TRANSFORM ? entryProcessor : val,
- op == TRANSFORM ? req.invokeArguments() : null,
- /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())
- && writeThrough() && !req.skipStore(),
- /*read-through*/false,
- /*retval*/false,
- req.keepBinary(),
- /*expiry policy*/null,
- /*event*/true,
- /*metrics*/true,
- /*primary*/false,
- /*check version*/!req.forceTransformBackups(),
- req.topologyVersion(),
- CU.empty0(),
- replicate ? DR_BACKUP : DR_NONE,
- ttl,
- expireTime,
- req.conflictVersion(i),
- false,
- intercept,
- req.subjectId(),
- taskName,
- prevVal,
- updateIdx,
- null);
+ long ttl = req.ttl(i);
+ long expireTime = req.conflictExpireTime(i);
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ nodeId,
+ nodeId,
+ op,
+ op == TRANSFORM ? entryProcessor : val,
+ op == TRANSFORM ? req.invokeArguments() : null,
+ /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())
+ && writeThrough() && !req.skipStore(),
+ /*read-through*/false,
+ /*retval*/false,
+ req.keepBinary(),
+ /*expiry policy*/null,
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/false,
+ /*check version*/!req.forceTransformBackups(),
+ req.topologyVersion(),
+ CU.empty0(),
+ replicate ? DR_BACKUP : DR_NONE,
+ ttl,
+ expireTime,
+ req.conflictVersion(i),
+ false,
+ intercept,
+ req.subjectId(),
+ taskName,
+ prevVal,
+ updateIdx,
+ null);
- if (updRes.removeVersion() != null)
- ctx.onDeferredDelete(entry, updRes.removeVersion());
+ if (updRes.removeVersion() != null)
+ ctx.onDeferredDelete(entry, updRes.removeVersion());
- entry.onUnlock();
+ entry.onUnlock();
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry while updating backup value (will retry): " + key);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while updating backup value (will retry): " + key);
- entry = null;
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, req.topologyVersion());
- }
+ entry = null;
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, req.topologyVersion());
}
}
- catch (GridDhtInvalidPartitionException ignored) {
- // Ignore.
- }
- catch (IgniteCheckedException e) {
- res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
- }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ // Ignore.
+ }
+ catch (IgniteCheckedException e) {
- IgniteCheckedException err =
- new IgniteCheckedException("Failed to update key on backup node: " + key, e);
++ IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+ if (nearRes != null)
+ nearRes.addFailedKey(key, err);
+
- U.error(log, "Failed to update key on backup node: " + key, e);
++ U.error(log, "Failed to update key on backup node: " + key, e);}
}
}
+ finally {
+ ctx.shared().database().checkpointReadUnlock();
+ }
- if (isNearEnabled(cacheCfg))
- ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+ GridDhtAtomicUpdateResponse dhtRes = null;
+
+ if (isNearEnabled(cacheCfg)) {
+ List<KeyCacheObject> nearEvicted =
+ ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+
+ if (nearEvicted != null) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
+
+ dhtRes.nearEvicted(nearEvicted);
+ }
+ }
+ try {
+ // TODO handle failure: probably drop the node from topology
+ // TODO fire events only after successful fsync
+ if (ctx.shared().wal() != null)
+ ctx.shared().wal().fsync(null);
+ }
+ catch (StorageException e) {
- res.onError(new IgniteCheckedException(e));
++ if (dhtRes != null)
++ dhtRes.onError(new IgniteCheckedException(e));
++
++ if (nearRes != null)
++ nearRes.onClassError(e);
+ }
+ catch (IgniteCheckedException e) {
- res.onError(e);
++ if (dhtRes != null)
++ dhtRes.onError(e);
++
++ if (nearRes != null)
++ nearRes.onClassError(e);
++ }
++
+ if (nearRes != null)
+ sendDhtNearResponse(req, nearRes);
+
+ if (dhtRes == null && req.replyWithoutDelay()) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
}
+ if (dhtRes != null)
+ sendDhtPrimaryResponse(nodeId, req, dhtRes);
+ else
+ sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @param dhtRes Response to send.
+ */
+ private void sendDhtPrimaryResponse(UUID nodeId,
+ GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicUpdateResponse dhtRes) {
try {
- if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
+ ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + nodeId + ']');
}
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId +
+ ", res=" + dhtRes + ']', e);
+ }
+ }
+
+ /**
+ * @param part Partition.
+ * @param primaryId Primary ID.
+ * @param futId Future ID.
+ */
+ private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) {
+ Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+ GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+ if (msg == null) {
+ msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+ new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE));
+
+ if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) {
+ GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId);
+
+ msg.timeoutSender(timeoutSnd);
+
+ ctx.time().addTimeoutObject(timeoutSnd);
+ }
+
+ resMap.put(primaryId, msg);
+ }
+
+ GridLongList futIds = msg.futureIds();
+
+ assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size();
+
+ futIds.add(futId);
- // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
- sendDeferredUpdateResponse(nodeId, req.futureVersion());
+ if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+ resMap.remove(primaryId);
+
+ sendDeferredUpdateResponse(primaryId, msg);
+ }
+ }
+
+ /**
+ * @param primaryId Primary ID.
+ * @param msg Message.
+ */
+ private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
+ try {
+ GridTimeoutObject timeoutSnd = msg.timeoutSender();
+
+ if (timeoutSnd != null)
+ ctx.time().removeTimeoutObject(timeoutSnd);
+
+ ctx.io().send(primaryId, msg, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
+ ", node=" + primaryId + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 9160865,6811236..9887f55
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@@ -140,93 -73,25 +73,27 @@@ public abstract class GridNearAtomicAbs
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean mappingKnown,
boolean skipStore,
boolean keepBinary,
+ boolean recovery,
- boolean clientReq,
boolean addDepInfo
) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.updateVer = updateVer;
- this.topVer = topVer;
- this.syncMode = syncMode;
- this.op = op;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.addDepInfo = addDepInfo;
-
- fastMap(fastMap);
- topologyLocked(topLocked);
- returnValue(retval);
- skipStore(skipStore);
- keepBinary(keepBinary);
- clientRequest(clientReq);
- recovery(recovery);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Mapped node ID.
- */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Subject ID.
- */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Future version.
- */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @return Update version for fast-map request.
- */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Cache write synchronization mode.
- */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ mappingKnown,
+ skipStore,
+ keepBinary,
++ recovery,
+ addDepInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index b933186,a43bfb0..4b3ea5bc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@@ -38,15 -48,177 +48,183 @@@ public abstract class GridNearAtomicAbs
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
+ /** . */
+ private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private static final int TOP_LOCKED_FLAG_MASK = 0x02;
+
+ /** Skip write-through to a persistent storage. */
+ private static final int SKIP_STORE_FLAG_MASK = 0x04;
+
+ /** Keep binary flag. */
+ private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+
+ /** Return value flag. */
+ private static final int RET_VAL_FLAG_MASK = 0x10;
+
++ /** Recovery value flag. */
++ private static final int RECOVERY_FLAG_MASK = 0x20;
++
+ /** Target node ID. */
+ @GridDirectTransient
+ protected UUID nodeId;
+
+ /** Future version. */
+ protected long futId;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Write synchronization mode. */
+ protected CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ protected GridCacheOperation op;
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
+ /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */
+ @GridToStringExclude
+ protected byte flags;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
/**
- * @return Mapped node ID.
+ *
*/
- public abstract UUID nodeId();
+ public GridNearAtomicAbstractUpdateRequest() {
+ // No-op.
+ }
/**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
* @param nodeId Node ID.
+ * @param futId Future ID.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param needPrimaryRes {@code True} if near node waits for primary response.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param addDepInfo Deployment info flag.
+ */
+ protected GridNearAtomicAbstractUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ long futId,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean needPrimaryRes,
+ boolean skipStore,
+ boolean keepBinary,
++ boolean recovery,
+ boolean addDepInfo
+ ) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futId = futId;
+ this.topVer = topVer;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.addDepInfo = addDepInfo;
+
+ if (needPrimaryRes)
+ needPrimaryResponse(true);
+ if (topLocked)
+ topologyLocked(true);
+ if (retval)
+ returnValue(true);
+ if (skipStore)
+ skipStore(true);
+ if (keepBinary)
+ keepBinary(true);
++ if (recovery)
++ recovery(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+ return ctx.atomicMessageLogger();
+ }
+
+ /**
+ * @return {@code True} if near node is able to initialize update mapping locally.
+ */
+ boolean initMappingLocally() {
+ return !needPrimaryResponse() && fullSync();
+ }
+
+ /**
+ * @return {@code True} if near node waits for primary response.
+ */
+ boolean needPrimaryResponse() {
+ return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
+ }
+
+ /**
+ * @param needRes {@code True} if near node waits for primary response.
+ */
+ void needPrimaryResponse(boolean needRes) {
+ setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode.
+ */
+ boolean fullSync() {
+ assert syncMode != null;
+
+ return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+ }
+
+ /**
+ * @return Task name hash code.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Update opreation.
*/
- public abstract void nodeId(UUID nodeId);
+ public GridCacheOperation operation() {
+ return op;
+ }
/**
* @return Subject ID.
@@@ -111,38 -328,51 +334,65 @@@
/**
* @return Keep binary flag.
*/
- public abstract boolean keepBinary();
+ public final boolean keepBinary() {
+ return isFlag(KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Recovery flag.
+ * @param val Keep binary flag.
*/
- public abstract boolean recovery();
+ public void keepBinary(boolean val) {
+ setFlag(val, KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Update operation.
++ * @return Keep binary flag.
+ */
- public abstract GridCacheOperation operation();
++ public final boolean recovery() {
++ return isFlag(RECOVERY_FLAG_MASK);
++ }
+
+ /**
- * @return Optional arguments for entry processor.
++ * @param val Keep binary flag.
+ */
- @Nullable public abstract Object[] invokeArguments();
++ public void recovery(boolean val) {
++ setFlag(val, RECOVERY_FLAG_MASK);
++ }
+
+ /**
- * @return Flag indicating whether this request contains primary keys.
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
*/
- public abstract boolean hasPrimary();
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
/**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
*/
- public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
/**
- * @return Response.
+ * @return Expiry policy.
+ */
+ public abstract ExpiryPolicy expiry();
+
+ /**
+ * @return Filter.
+ */
+ @Nullable public abstract CacheEntryPredicate[] filter();
+
+ /**
+ * @return Optional arguments for entry processor.
*/
- @Nullable public abstract GridNearAtomicUpdateResponse response();
+ @Nullable public abstract Object[] invokeArguments();
/**
* @param key Key to add.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index dcaf246,ade9976..4a94c22
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@@ -212,26 -152,26 +151,28 @@@ public class GridNearAtomicFullUpdateRe
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
+ boolean recovery,
- boolean clientReq,
boolean addDepInfo,
int maxEntryCnt
) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ needPrimaryRes,
+ skipStore,
+ keepBinary,
++ recovery,
+ addDepInfo);
this.expiryPlc = expiryPlc;
this.invokeArgs = invokeArgs;
this.filter = filter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index 39b6ab2,c32501a..30197cd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@@ -88,10 -84,9 +84,10 @@@ public class GridNearAtomicSingleUpdate
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
+ boolean recovery,
- boolean clientReq,
boolean addDepInfo
) {
super(
@@@ -107,10 -100,9 +101,10 @@@
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- clientReq,
addDepInfo
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index dbff89a,c2372d1..6401fbd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@@ -108,8 -102,21 +104,22 @@@ public class GridNearAtomicSingleUpdate
int remapCnt,
boolean waitTopFut
) {
- super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
- skipStore, keepBinary, recovery, remapCnt, waitTopFut);
+ super(cctx,
+ cache,
+ syncMode,
+ op,
+ invokeArgs,
+ retval,
+ rawRetval,
+ expiryPlc,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
++ recovery,
+ remapCnt,
+ waitTopFut);
assert subjId != null;
@@@ -378,49 -433,21 +436,21 @@@
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
-
- cache.topology().readLock();
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- return;
- }
-
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null);
-
- if (err != null) {
- onDone(err);
+ return;
+ }
- return;
- }
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- topVer = fut.topologyVersion();
+ if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
++ Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null);
- futVer = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
@@@ -564,10 -628,9 +631,10 @@@
invokeArgs,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
@@@ -585,10 -646,9 +650,10 @@@
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
@@@ -606,10 -664,9 +669,10 @@@
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
}
@@@ -631,10 -686,9 +692,10 @@@
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 02cfd91,298ea05..bbcad12
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@@ -106,10 -102,9 +102,10 @@@ public class GridNearAtomicSingleUpdate
@Nullable Object[] invokeArgs,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
+ boolean recovery,
- boolean clientReq,
boolean addDepInfo
) {
super(
@@@ -125,15 -118,15 +119,16 @@@
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- clientReq,
addDepInfo
);
- this.invokeArgs = invokeArgs;
assert op == TRANSFORM : op;
+
+ this.invokeArgs = invokeArgs;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 18b6118,14c70aa..94373c4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@@ -100,18 -93,14 +93,15 @@@ public class GridNearAtomicSingleUpdate
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
+ boolean recovery,
- boolean clientReq,
boolean addDepInfo
) {
- super(
- cacheId,
+ super(cacheId,
nodeId,
- futVer,
- fastMap,
- updateVer,
+ futId,
topVer,
topLocked,
syncMode,
@@@ -119,12 -108,10 +109,12 @@@
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- addDepInfo);
+ recovery,
- clientReq,
+ addDepInfo
+ );
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f24a9b1,a44ccf9..c5824d5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@@ -490,49 -641,21 +642,21 @@@ public class GridNearAtomicUpdateFutur
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
-
- cache.topology().readLock();
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
- return;
- }
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx, recovery, false, null, keys);
+ return;
+ }
- if (err != null) {
- onDone(err);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- return;
- }
+ if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
++ Throwable err = fut.validateCache(cctx, recovery, false, null, keys);
- topVer = fut.topologyVersion();
-
- futVer = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
@@@ -826,50 -1036,44 +1037,45 @@@
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid).");
- int i = 0;
-
- for (int n = 0; n < affNodes.size(); n++) {
- ClusterNode affNode = affNodes.get(n);
-
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicFullUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- recovery,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
+ ClusterNode primary = nodes.get(0);
+
+ boolean needPrimaryRes = !mappingKnown || primary.isLocal();
+
+ UUID nodeId = primary.id();
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+ PrimaryRequestState mapped = pendingMappings.get(nodeId);
- i++;
+ if (mapped == null) {
+ GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ needPrimaryRes,
+ skipStore,
+ keepBinary,
++ recovery,
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ mapped = new PrimaryRequestState(req, nodes, false);
+
+ pendingMappings.put(nodeId, mapped);
}
+
+ if (mapped.req.initMappingLocally())
+ mapped.addMapping(nodes);
+
+ mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
}
return pendingMappings;
@@@ -959,10 -1164,9 +1166,10 @@@
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
+ recovery,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 955b8ba,8b52ba8..1c761c8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@@ -351,18 -320,10 +320,13 @@@ public class GridNearAtomicUpdateRespon
* @param e Error cause.
*/
public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+ assert key != null;
+ assert e != null;
+
- if (failedKeys == null)
- failedKeys = new ConcurrentLinkedQueue<>();
-
- failedKeys.add(key);
+ if (errs == null)
+ errs = new UpdateErrors();
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
+ errs.addFailedKey(key, e);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d86dc91,03bbfe0..f922d09
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -208,9 -211,9 +208,11 @@@ public class GridDhtColocatedCache<K, V
final CacheOperationContext opCtx = ctx.operationContextPerCall();
++ final boolean recovery = opCtx != null && opCtx.recovery();
++
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
readyTopVer,
Collections.singleton(ctx.toCacheKeyObject(key)),
@@@ -218,6 -221,6 +220,7 @@@
skipVals,
false,
opCtx != null && opCtx.skipStore(),
++ recovery,
needVer);
return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() {
@@@ -275,7 -277,6 +278,7 @@@
@Nullable UUID subjId,
String taskName,
final boolean deserializeBinary,
- boolean recovery,
++ final boolean recovery,
final boolean skipVals,
boolean canRemap,
final boolean needVer
@@@ -302,6 -303,6 +305,7 @@@
skipVals,
false,
opCtx != null && opCtx.skipStore(),
++ recovery,
needVer);
}
}, opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 31cff03,79c15fb..56dc322
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -161,8 -161,8 +161,11 @@@ public final class GridDhtColocatedLock
private final boolean keepBinary;
/** */
+ private final boolean recovery;
+
++ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@@ -917,38 -914,38 +920,36 @@@
!topLocked &&
(tx == null || !tx.hasRemoteLocks());
- first = false;
- }
-
- assert !implicitTx() && !implicitSingleTx() : tx;
+ first = false;
+ }
- req = new GridNearLockRequest(
- cctx.cacheId(),
- topVer,
- cctx.nodeId(),
- threadId,
- futId,
- lockVer,
- inTx(),
- implicitTx(),
- implicitSingleTx(),
- read,
- retval,
- isolation(),
- isInvalidate(),
- timeout,
- mappedKeys.size(),
- inTx() ? tx.size() : mappedKeys.size(),
- inTx() && tx.syncMode() == FULL_SYNC,
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? createTtl : -1L,
- req = new GridNearLockRequest(
++ assert !implicitTx() && !implicitSingleTx() : tx;req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ read,
+ retval,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncMode() == FULL_SYNC,
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? createTtl : -1L,
read ? accessTtl : -1L,
- skipStore,
- keepBinary,
- clientFirst,
- cctx.deploymentEnabled());
+ skipStore,
+ keepBinary,
+ clientFirst,
+ cctx.deploymentEnabled());
- mapping.request(req);
- }
+ mapping.request(req);
+ }
distributedKeys.add(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c33dc7b,5eacc36..829b29d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -33,9 -33,9 +34,10 @@@ import java.util.concurrent.atomic.Atom
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.DiscoveryEvent;
@@@ -52,9 -51,7 +55,10 @@@ import org.apache.ignite.internal.pagem
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.ClusterState;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@@ -74,7 -68,7 +78,8 @@@ import org.apache.ignite.internal.util.
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@@ -98,9 -88,8 +103,9 @@@ import static org.apache.ignite.interna
/**
* Future for exchanging partition maps.
*/
+@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
- implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+ implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
/** */
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 12b5204,79c71b3..6aa7441
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@@ -391,43 -360,37 +372,43 @@@ public class GridNearGetRequest extend
writer.incrementState();
- case 11:
+ case 10:
- if (!writer.writeBoolean("reload", reload))
+ if (!writer.writeBoolean("recovery", recovery))
return false;
writer.incrementState();
- case 12:
+ case 11:
- if (!writer.writeBoolean("skipVals", skipVals))
+ if (!writer.writeBoolean("reload", reload))
return false;
writer.incrementState();
- case 13:
+ case 12:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipVals", skipVals))
return false;
writer.incrementState();
- case 14:
+ case 13:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 15:
+ case 14:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 16:
+ case 15:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
if (!writer.writeMessage("ver", ver))
return false;
@@@ -513,55 -468,47 +486,55 @@@
reader.incrementState();
- case 11:
+ case 10:
- reload = reader.readBoolean("reload");
+ recovery = reader.readBoolean("recovery");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 12:
+ case 11:
- skipVals = reader.readBoolean("skipVals");
+ reload = reader.readBoolean("reload");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 13:
+ case 12:
- subjId = reader.readUuid("subjId");
+ skipVals = reader.readBoolean("skipVals");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 14:
+ case 13:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 15:
+ case 14:
- topVer = reader.readMessage("topVer");
+ taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 16:
+ case 15:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 17:
ver = reader.readMessage("ver");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d8f9222,1948df0..0900bac
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -164,9 -164,9 +164,12 @@@ public final class GridNearLockFuture e
/** Keep binary context flag. */
private final boolean keepBinary;
+ /** Recovery mode context flag. */
+ private final boolean recovery;
+
+ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index ec4b9e5,48b508b..e519707
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -398,67 -376,31 +376,31 @@@ public class GridNearLockRequest extend
writer.incrementState();
- case 24:
+ case 25:
- if (!writer.writeBoolean("firstClientReq", firstClientReq))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 25:
+ case 26:
- if (!writer.writeBoolean("hasTransforms", hasTransforms))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 26:
+ case 27:
- if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeBoolean("implicitTx", implicitTx))
- return false;
-
- writer.incrementState();
-
- case 29:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 30:
- if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
- return false;
-
- writer.incrementState();
-
- case 31:
- if (!writer.writeBoolean("retVal", retVal))
- return false;
-
- writer.incrementState();
-
- case 32:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
- if (!writer.writeBoolean("syncCommit", syncCommit))
- return false;
-
- writer.incrementState();
-
- case 34:
- case 27:
++ case 28:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 35:
- case 28:
++ case 29:
if (!writer.writeMessage("topVer", topVer))
return false;
@@@ -512,63 -454,23 +454,23 @@@
reader.incrementState();
- case 24:
+ case 25:
- firstClientReq = reader.readBoolean("firstClientReq");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 25:
+ case 26:
- hasTransforms = reader.readBoolean("hasTransforms");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 26:
+ case 27:
- implicitSingleTx = reader.readBoolean("implicitSingleTx");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- implicitTx = reader.readBoolean("implicitTx");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 30:
- onePhaseCommit = reader.readBoolean("onePhaseCommit");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 31:
- retVal = reader.readBoolean("retVal");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 32:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@@ -576,15 -478,7 +478,7 @@@
reader.incrementState();
- case 33:
- syncCommit = reader.readBoolean("syncCommit");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 34:
- case 27:
++ case 28:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@@ -592,7 -486,7 +486,7 @@@
reader.incrementState();
- case 35:
- case 28:
++ case 29:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index f09b6c8,976f05f..1d610c7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@@ -62,6 -62,19 +62,19 @@@ public abstract class GridNearOptimisti
}
if (topVer != null) {
+ try {
- IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock());
++ IgniteCheckedException err = tx.txState().validateTopology(cctx, false, topologyReadLock());
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
+
tx.topologyVersion(topVer);
cctx.mvcc().addFuture(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index cd6e275,994172b..de69b21
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@@ -44,23 -42,20 +42,23 @@@ public class GridNearSingleGetRequest e
private static final long serialVersionUID = 0L;
/** */
- public static final int READ_THROUGH_FLAG_MASK = 0x01;
+ private static final int READ_THROUGH_FLAG_MASK = 0x01;
/** */
- public static final int SKIP_VALS_FLAG_MASK = 0x02;
+ private static final int SKIP_VALS_FLAG_MASK = 0x02;
/** */
- public static final int ADD_READER_FLAG_MASK = 0x04;
+ private static final int ADD_READER_FLAG_MASK = 0x04;
/** */
- public static final int NEED_VER_FLAG_MASK = 0x08;
+ private static final int NEED_VER_FLAG_MASK = 0x08;
/** */
- public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+ private static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+ /** */
+ public static final int RECOVERY_FLAG_MASK = 0x20;
+
/** Future ID. */
private long futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 1bb39e2,5ad05b0..1468e8a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -43,7 -43,7 +43,6 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
--import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@@ -121,7 -121,6 +120,7 @@@ public class GridNearTransactionalCache
@Nullable UUID subjId,
String taskName,
final boolean deserializeBinary,
- boolean recovery,
++ final boolean recovery,
final boolean skipVals,
boolean canRemap,
final boolean needVer
@@@ -150,6 -149,6 +149,7 @@@
skipVals,
false,
skipStore,
++ recovery,
needVer);
}
}, opCtx);