You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 14:29:52 UTC
[2/6] incubator-ignite git commit: # ignite-51
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 45b13b7..d2b7d36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -464,24 +464,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
entry.cached(cached, null);
-// TODO IGNITE-51.
-// while (true) {
-// GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
-//
-// try {
-// // Set key bytes to avoid serializing in future.
-// cached.keyBytes(entry.keyBytes());
-//
-// entry.cached(cached, null);
-//
-// break;
-// }
-// catch (GridCacheEntryRemovedException ignore) {
-// if (log.isDebugEnabled())
-// log.debug("Got removed entry when adding to dht tx (will retry): " + cached);
-// }
-// }
-
GridCacheVersion explicit = entry.explicitVersion();
if (explicit != null) {
@@ -641,17 +623,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
- IgniteInternalFuture<Boolean> fut = null;
-// TODO IGNTIE-51
-// IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-// lockTimeout(),
-// this,
-// isInvalidate(),
-// read,
-// /*retval*/false,
-// isolation,
-// accessTtl,
-// CU.empty());
+ IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
+ lockTimeout(),
+ this,
+ isInvalidate(),
+ read,
+ /*retval*/false,
+ isolation,
+ accessTtl,
+ (IgnitePredicate[])CU.empty());
return new GridEmbeddedFuture<>(
fut,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 369935b..7f9022a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -340,7 +340,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
if (err != null || procRes != null)
- ret.addEntryProcessResult(key,
+ ret.addEntryProcessResult(key.value(cacheCtx, false),
err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err));
else
ret.invokeResult(true);
@@ -1240,7 +1240,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
try {
- if (entry.initialValue(info.value(), null, info.version(),
+ if (entry.initialValue(info.value(), info.version(),
info.ttl(), info.expireTime(), true, topVer, drType)) {
if (rec && !entry.isInternal())
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 2423fe1..ec45af1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -328,73 +328,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
+ case 24:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeByteArray("ownedBytes", ownedBytes))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 34:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 34:
+ case 35:
if (!writer.writeLong("topVer", topVer))
return false;
@@ -416,7 +416,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
+ case 24:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -424,7 +424,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -432,7 +432,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -440,7 +440,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -448,7 +448,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -456,7 +456,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -464,7 +464,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -472,7 +472,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
ownedBytes = reader.readByteArray("ownedBytes");
if (!reader.isLastRead())
@@ -480,7 +480,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -488,7 +488,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -496,7 +496,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 34:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -504,7 +504,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 35:
topVer = reader.readLong("topVer");
if (!reader.isLastRead())
@@ -524,6 +524,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 35;
+ return 36;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 96777c9..603141b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -60,7 +60,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
private GridCacheContext<K, V> cctx;
/** Keys. */
- private Collection<? extends K> keys;
+ private Collection<KeyCacheObject> keys;
/** Topology version. */
private long topVer;
@@ -127,7 +127,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
*/
public GridPartitionedGetFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long topVer,
boolean readThrough,
boolean reload,
@@ -167,21 +167,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
public void init() {
long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
- Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
- @Override public KeyCacheObject apply(K key) {
- if (key == null) {
- NullPointerException err = new NullPointerException("Null key.");
-
- onDone(err);
-
- throw err;
- }
-
- return cctx.toCacheKeyObject(key);
- }
- });
-
- map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
markInitialized();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5e38db3..adcaebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -276,6 +276,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean deserializePortable,
final boolean skipVals
) {
+ ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
subjId = ctx.subjectIdPerCall(null, prj);
@@ -286,7 +294,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
@Override public IgniteInternalFuture<Map<K, V>> apply() {
- return getAllAsync0(keys,
+ return getAllAsync0(ctx.cacheKeysView(keys),
false,
forcePrimary,
subjId0,
@@ -624,7 +632,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -895,7 +903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param skipVals Skip values flag.
* @return Get future.
*/
- private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
+ private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
boolean reload,
boolean forcePrimary,
UUID subjId,
@@ -903,14 +911,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals) {
- ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
- if (F.isEmpty(keys))
- return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
-
- if (keyCheck)
- validateCacheKeys(keys);
-
long topVer = ctx.affinity().affinityTopologyVersion();
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
@@ -922,17 +922,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean success = true;
// Optimistically expect that all keys are available locally (avoid creation of get future).
- for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
+ for (KeyCacheObject key : keys) {
GridCacheEntryEx entry = null;
while (true) {
try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
// If our DHT cache do has value, then we peek it.
if (entry != null) {
@@ -956,12 +951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion obsoleteVer = context().versions().next();
if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeIfObsolete(cacheKey);
+ removeIfObsolete(key);
success = false;
}
else
- ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, false);
+ ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, false);
}
else
success = false;
@@ -998,7 +993,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (success) {
sendTtlUpdateRequest(expiry);
- return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+ return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
}
}
@@ -1020,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/**
@@ -1700,7 +1695,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
locNodeId,
op,
writeVal,
- null,
req.invokeArguments(),
primary && writeThrough(),
req.returnValue(),
@@ -1965,7 +1959,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
null,
- null,
false,
false,
expiry,
@@ -2460,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
nodeId,
op,
op == TRANSFORM ? entryProcessor : val,
- null,
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*retval*/false,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e2b974c..a9a26c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
GridCacheReturn ret = (GridCacheReturn)res;
- if (op != TRANSFORM) {
+ if (op != TRANSFORM && ret != null) {
CacheObject val = (CacheObject)ret.value();
ret.value(CU.value(val, cctx, false));
@@ -357,7 +357,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (res.remapKeys() != null) {
assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
- // TODO IGNITE-51.
mapOnTopology(res.remapKeys(), true, nodeId);
return;
@@ -809,7 +808,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
- onDone(new GridCacheReturn<Object>(null, true));
+ onDone(new GridCacheReturn(null, true));
}
catch (IgniteCheckedException e) {
onDone(addFailedKeys(req.keys(), e));
@@ -928,8 +927,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (err0 == null)
err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- // TODO IGNITE-51.
- err0.add(failedKeys, err);
+ List<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(key.value(cctx, false));
+
+ err0.add(keys, err);
return err0;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 84b32ce..a22b31c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -183,12 +183,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
+ if (keyCheck)
+ validateCacheKeys(keys);
+
IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
- return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+ return tx.getAllAsync(ctx,
+ ctx.cacheKeysView(keys),
+ entry,
+ deserializePortable,
+ skipVals,
+ false);
}
});
}
@@ -200,7 +208,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
subjId = ctx.subjectIdPerCall(subjId, prj);
return loadAsync(
- keys,
+ ctx.cacheKeysView(keys),
true,
false,
forcePrimary,
@@ -232,9 +240,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param taskName Task name.
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
+ public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean reload,
boolean forcePrimary,
@@ -248,9 +257,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
- if (keyCheck)
- validateCacheKeys(keys);
-
if (expiryPlc == null)
expiryPlc = expiryPolicy(null);
@@ -261,17 +267,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean success = true;
// Optimistically expect that all keys are available locally (avoid creation of get future).
- for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
+ for (KeyCacheObject key : keys) {
GridCacheEntryEx entry = null;
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
while (true) {
try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
// If our DHT cache do has value, then we peek it.
if (entry != null) {
@@ -295,12 +296,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
GridCacheVersion obsoleteVer = context().versions().next();
if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeIfObsolete(cacheKey);
+ removeIfObsolete(key);
success = false;
}
else
- ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, true);
+ ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
}
else
success = false;
@@ -337,7 +338,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (success) {
sendTtlUpdateRequest(expiryPlc);
- return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+ return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
}
}
@@ -360,7 +361,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/**
@@ -369,7 +370,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* {@inheritDoc}
*/
@Override public IgniteInternalFuture<Boolean> lockAllAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -426,7 +427,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
Collection<KeyCacheObject> locKeys = new ArrayList<>();
for (K key : keys) {
- // TODO IGNITE-51.
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
GridDistributedCacheEntry entry = peekExx(cacheKey);
@@ -519,7 +519,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param ver Lock version.
* @param keys Keys.
*/
- public void removeLocks(long threadId, GridCacheVersion ver, Collection<? extends K> keys) {
+ public void removeLocks(long threadId, GridCacheVersion ver, Collection<KeyCacheObject> keys) {
if (keys.isEmpty())
return;
@@ -530,11 +530,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
Collection<KeyCacheObject> locKeys = new LinkedList<>();
- for (K key : keys) {
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
- GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver);
+ for (KeyCacheObject key : keys) {
+ GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
if (lock != null) {
long topVer = lock.topologyVersion();
@@ -559,14 +556,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
req.version(ver);
}
- GridCacheEntryEx entry = peekEx(cacheKey);
+ GridCacheEntryEx entry = peekEx(key);
- KeyCacheObject key0 = entry != null ? entry.key() : cacheKey;
+ KeyCacheObject key0 = entry != null ? entry.key() : key;
req.addKey(key0, ctx);
}
else
- locKeys.add(cacheKey);
+ locKeys.add(key);
}
}
@@ -616,7 +613,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
final long threadId,
final GridCacheVersion ver,
final long topVer,
- final Collection<K> keys,
+ final Collection<KeyCacheObject> keys,
final boolean txRead,
final long timeout,
final long accessTtl,
@@ -624,9 +621,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
) {
assert keys != null;
- // TODO IGNITE-51.
- // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
- IgniteInternalFuture<Object> keyFut = null;
+ IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
// Prevent embedded future creation if possible.
if (keyFut.isDone()) {
@@ -691,7 +686,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
long threadId,
final GridCacheVersion ver,
final long topVer,
- final Collection<K> keys,
+ final Collection<KeyCacheObject> keys,
final boolean txRead,
final long timeout,
final long accessTtl,
@@ -717,15 +712,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean timedout = false;
- for (K key : keys) {
+ for (KeyCacheObject key : keys) {
if (timedout)
break;
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
while (true) {
- GridDhtCacheEntry entry = entryExx(cacheKey, topVer);
+ GridDhtCacheEntry entry = entryExx(key, topVer);
try {
fut.addEntry(key == null ? null : entry);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7d007a1..29e9730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
private long threadId;
/** Keys to lock. */
- private Collection<? extends K> keys;
+ private Collection<KeyCacheObject> keys;
/** Future ID. */
private IgniteUuid futId;
@@ -133,7 +133,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
*/
public GridDhtColocatedLockFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
boolean retval,
@@ -535,8 +535,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Continue mapping on the same topology version as it was before.
topSnapshot.compareAndSet(null, snapshot);
- // TODO IGNITE-51.
- // map(keys);
+ map(keys);
markInitialized();
@@ -569,8 +568,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
topSnapshot.compareAndSet(null, snapshot);
- // TODO IGNITE-51.
- // map(keys);
+ map(keys);
markInitialized();
}
@@ -881,9 +879,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
threadId,
lockVer,
topVer,
- // TODO IGNITE-51.
- // keys,
- null,
+ keys,
read,
timeout,
accessTtl,
@@ -1244,7 +1240,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
// Set value to detached entry.
- entry.resetFromPrimary(newVal, null, dhtVer);
+ entry.resetFromPrimary(newVal, dhtVer);
if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 944034c..b9602d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,16 +46,12 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
* Sets value to detached entry so it can be retrieved in transactional gets.
*
* @param val Value.
- * @param valBytes Value bytes.
* @param ver Version.
* @throws IgniteCheckedException If value unmarshalling failed.
*/
- public void resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver)
+ public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException {
- if (valBytes != null && val == null)
- val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
-
- value(val, valBytes);
+ value(val);
this.ver = ver;
}
@@ -66,7 +62,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) {
+ @Override protected void value(@Nullable CacheObject val) {
this.val = val;
}
@@ -79,7 +75,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+ @Override protected void updateIndex(CacheObject val, long expireTime,
GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
// No-op for detached entries, index is updated on primary nodes.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 66efe48..1338788 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -503,7 +503,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
try {
if (entry.initialValue(
info.value(),
- null,
info.version(),
info.ttl(),
info.expireTime(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 6ebd8d2..4e5beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -506,7 +506,6 @@ public class GridDhtPartitionDemandPool<K, V> {
if (preloadPred == null || preloadPred.apply(entry)) {
if (cached.initialValue(
entry.value(),
- null,
entry.version(),
entry.ttl(),
entry.expireTime(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a30f372..4ebcced 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -219,7 +219,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
nodeId,
op,
val,
- valBytes,
null,
/*write-through*/false,
/*retval*/false,
@@ -316,7 +315,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
nodeId,
op,
op == TRANSFORM ? entryProcessor : val,
- null,
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*retval*/false,
@@ -650,7 +648,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -659,7 +657,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@Nullable TransactionIsolation isolation,
long accessTtl,
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- return dht.lockAllAsync(keys, timeout, filter);
+ return dht.lockAllAsync(null, timeout, filter);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index a32b6a8..b8f8c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -309,7 +309,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
// init() will register future for responses if future has remote mappings.
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2f6bebb..2d859a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -138,7 +138,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(topVer)) {
// Version does not change for load ops.
- update(e.value(), null, e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+ update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
if (cctx.deferredDelete()) {
boolean deleted = val == null;
@@ -176,7 +176,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* This method should be called only when lock is owned on this entry.
*
* @param val Value.
- * @param valBytes Value bytes.
* @param ver Version.
* @param dhtVer DHT version.
* @param primaryNodeId Primary node ID.
@@ -185,27 +184,23 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings( {"RedundantTypeArguments"})
- public boolean resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer,
- UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException {
+ public boolean resetFromPrimary(CacheObject val,
+ GridCacheVersion ver,
+ GridCacheVersion dhtVer,
+ UUID primaryNodeId)
+ throws GridCacheEntryRemovedException, IgniteCheckedException
+ {
assert dhtVer != null;
cctx.versions().onReceived(primaryNodeId, dhtVer);
-// TODO IGNITE-51.
-// if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) {
-// GridCacheVersion curDhtVer = dhtVersion();
-//
-// if (!F.eq(dhtVer, curDhtVer))
-// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader());
-// }
-
synchronized (this) {
checkObsolete();
this.primaryNodeId = primaryNodeId;
if (!F.eq(this.dhtVer, dhtVer)) {
- value(val, valBytes);
+ value(val);
this.ver = ver;
this.dhtVer = dhtVer;
@@ -222,14 +217,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
*
* @param dhtVer DHT version.
* @param val Value associated with version.
- * @param valBytes Value bytes.
* @param expireTime Expire time.
* @param ttl Time to live.
* @param primaryNodeId Primary node ID.
*/
public void updateOrEvict(GridCacheVersion dhtVer,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
long expireTime,
long ttl,
UUID primaryNodeId)
@@ -248,7 +241,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// If cannot evict, then update.
if (this.dhtVer == null) {
if (!markObsolete(dhtVer)) {
- value(val, valBytes);
+ value(val);
ttlAndExpireTimeExtras((int) ttl, expireTime);
@@ -396,7 +389,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// Change entry only if dht version has changed.
if (!dhtVer.equals(dhtVersion())) {
- update(val, valBytes, expireTime, ttl, ver);
+ update(val, expireTime, ttl, ver);
if (cctx.deferredDelete()) {
boolean deleted = val == null && valBytes == null;
@@ -429,7 +422,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+ @Override protected void updateIndex(CacheObject val, long expireTime,
GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
// No-op: queries are disabled for near cache.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index b880966..d89b59d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1029,7 +1029,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
res.rolledbackVersions(), res.pending());
@@ -1387,7 +1387,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 6e88c4f..e35e0fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -116,7 +116,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
- return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+ return tx.getAllAsync(ctx,
+ ctx.cacheKeysView(keys),
+ entry,
+ deserializePortable,
+ skipVals,
+ false);
}
});
}
@@ -400,7 +405,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> lockAllAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -411,9 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
IgnitePredicate<Cache.Entry<K, V>>[] filter
) {
GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
- // TODO IGNITE-51
- // keys,
- null,
+ keys,
(GridNearTxLocal)tx,
isRead,
retval,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index d899562..8d6800d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -280,7 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> loadMissing(
- GridCacheContext cacheCtx,
+ final GridCacheContext cacheCtx,
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -288,69 +288,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
boolean skipVals,
final IgniteBiInClosure<KeyCacheObject, Object> c
) {
- return null;
-// TODO IGNITE-51.
-// if (cacheCtx.isNear()) {
-// return cacheCtx.nearTx().txLoadAsync(this,
-// keys,
-// readThrough,
-// deserializePortable,
-// accessPolicy(cacheCtx, keys),
-// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-// try {
-// Map<K, V> map = f.get();
-//
-// // Must loop through keys, not map entries,
-// // as map entries may not have all the keys.
-// for (K key : keys)
-// c.apply(key, map.get(key));
-//
-// return true;
-// }
-// catch (Exception e) {
-// setRollbackOnly();
-//
-// throw new GridClosureException(e);
-// }
-// }
-// });
-// }
-// else if (cacheCtx.isColocated()) {
-// return cacheCtx.colocated().loadAsync(keys,
-// readThrough,
-// /*reload*/false,
-// /*force primary*/false,
-// topologyVersion(),
-// CU.subjectId(this, cctx),
-// resolveTaskName(),
-// deserializePortable,
-// accessPolicy(cacheCtx, keys),
-// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-// try {
-// Map<K, V> map = f.get();
-//
-// // Must loop through keys, not map entries,
-// // as map entries may not have all the keys.
-// for (K key : keys)
-// c.apply(key, map.get(key));
-//
-// return true;
-// }
-// catch (Exception e) {
-// setRollbackOnly();
-//
-// throw new GridClosureException(e);
-// }
-// }
-// });
-// }
-// else {
-// assert cacheCtx.isLocal();
-//
-// return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
-// }
+ if (cacheCtx.isNear()) {
+ return cacheCtx.nearTx().txLoadAsync(this,
+ keys,
+ readThrough,
+ deserializePortable,
+ accessPolicy(cacheCtx, keys),
+ skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ // Must loop through keys, not map entries,
+ // as map entries may not have all the keys.
+ for (KeyCacheObject key : keys)
+ c.apply(key, map.get(key.value(cacheCtx, false)));
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+ }
+ });
+ }
+ else if (cacheCtx.isColocated()) {
+ return cacheCtx.colocated().loadAsync(keys,
+ readThrough,
+ /*reload*/false,
+ /*force primary*/false,
+ topologyVersion(),
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ deserializePortable,
+ accessPolicy(cacheCtx, keys),
+ skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ // Must loop through keys, not map entries,
+ // as map entries may not have all the keys.
+ for (KeyCacheObject key : keys)
+ c.apply(key, map.get(key.value(cacheCtx, false)));
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+ }
+ });
+ }
+ else {
+ assert cacheCtx.isLocal();
+
+ return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 9af91c6..491a171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -949,7 +949,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
- nearEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion(),
+ nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(),
tup.get1(), m.node().id());
}
else if (txEntry.cached().detached()) {
@@ -957,7 +957,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
- detachedEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion());
+ detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion());
}
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 42e8600..e4111ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -257,61 +257,61 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
+ case 24:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeBoolean("implicitSingle", implicitSingle))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeBoolean("near", near))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeLong("topVer", topVer))
return false;
@@ -333,7 +333,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
+ case 24:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -341,7 +341,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
implicitSingle = reader.readBoolean("implicitSingle");
if (!reader.isLastRead())
@@ -349,7 +349,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -357,7 +357,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
@@ -365,7 +365,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -373,7 +373,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
near = reader.readBoolean("near");
if (!reader.isLastRead())
@@ -381,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
@@ -389,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -397,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -405,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
topVer = reader.readLong("topVer");
if (!reader.isLastRead())
@@ -425,7 +425,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 33;
+ return 34;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 273a184..78cfb73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -224,14 +224,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (ownedVals != null && ownedValsBytes == null) {
ownedValsBytes = new ArrayList<>(ownedVals.size());
for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> entry : ownedVals.entrySet()) {
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
+ GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId());
+
+ entry.getKey().prepareMarshal(cctx);
+
CacheObject val = tup.get2();
if (val != null)
@@ -245,8 +247,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
retValBytes = ctx.marshaller().marshal(retVal);
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys)
+ for (IgniteTxKey key :filterFailedKeys) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.prepareMarshal(cctx);
+ }
}
}
@@ -254,8 +259,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (ownedValsBytes != null && ownedVals == null) {
ownedVals = new HashMap<>();
@@ -264,6 +267,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
CacheObject val = tup.get3();
+ GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId());
+
+ tup.get1().finishUnmarshal(cctx, ldr);
+
if (val != null)
val.finishUnmarshal(cctx, ldr);
@@ -275,8 +282,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys)
+ for (IgniteTxKey key :filterFailedKeys) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.finishUnmarshal(cctx, ldr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 7a0c5d8..4c59437 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -104,7 +104,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+ @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
@@ -121,7 +121,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
IgniteTxLocalEx tx = ctx.tm().localTx();
- return lockAllAsync(keys, timeout, tx, filter);
+ return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter);
}
/**
@@ -131,7 +131,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
* @param filter Filter.
* @return Future.
*/
- public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
@@ -141,12 +141,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
try {
- for (K key : keys) {
+ for (KeyCacheObject key : keys) {
while (true) {
GridLocalCacheEntry entry = null;
try {
- entry = entryExx(ctx.toCacheKeyObject(key));
+ entry = entryExx(key);
if (!ctx.isAll(entry, filter)) {
fut.onFailed();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index b2797a4..4e769e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -107,7 +107,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
*/
GridLocalLockFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
IgniteTxLocalEx tx,
GridLocalCache<K, V> cache,
long timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 833f8c9..8aa6a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1014,7 +1014,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
try {
entry = entryEx(cacheKey);
- GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+ GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
val == null ? DELETE : op,
val,
@@ -1452,7 +1452,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+ GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
op,
writeVal,
@@ -1468,12 +1468,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
taskName);
if (intercept) {
- if (op == UPDATE)
+ if (op == UPDATE) {
ctx.config().getInterceptor().onAfterPut(entry.key().value(ctx, false),
- writeVal.value(ctx, false));
+ writeVal.value(ctx, false));
+ }
else
- ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false),
- CU.value(t.get2(), ctx, false));
+ ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false), t.get2());
}
}
catch (GridCacheEntryRemovedException ignore) {
@@ -1555,7 +1555,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+ @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index c26ca4b..563d38c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1193,7 +1193,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
- protected GridTuple3<GridCacheOperation, CacheObject, byte[]> applyTransformClosures(
+ protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
IgniteTxEntry txEntry,
boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
GridCacheContext cacheCtx = txEntry.context();
@@ -1201,10 +1201,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
assert cacheCtx != null;
if (isSystemInvalidate())
- return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null, null);
+ return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
if (F.isEmpty(txEntry.entryProcessors()))
- return F.t(txEntry.op(), txEntry.value(), null);
+ return F.t(txEntry.op(), txEntry.value());
else {
try {
boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
@@ -1246,14 +1246,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
modified |= invokeEntry.modified();
}
- if (modified) {
- cacheVal = cacheCtx.toCacheObject(val);
-// TODO IGNITE-51
-// val = (V)cacheCtx.<V>unwrapTemporary(val);
-//
-// if (cacheCtx.portableEnabled())
-// val = (V)cacheCtx.marshalToPortable(val);
- }
+ if (modified)
+ cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
@@ -1270,7 +1264,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
}
- return F.t(op, cacheVal, null);
+ return F.t(op, cacheVal);
}
catch (GridCacheFilterFailedException e) {
assert false : "Empty filter failed for innerGet: " + e;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6b1881d..66eea11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -1044,7 +1044,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException {
hasWriteVal = in.readBoolean();
- val = (CacheObject)in.readObject();
+ if (hasWriteVal)
+ val = (CacheObject)in.readObject();
op = fromOrdinal(in.readInt());
// TODO IGNITE-51.