You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 14:29:51 UTC
[1/6] incubator-ignite git commit: # ignite-51
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-51 2f43d32d1 -> 7f9a6630b
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 167c2e9..7f39503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -366,7 +366,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return new GridFinishedFuture<>(cctx.kernalContext(), e);
}
}
- else
+ else {
return cctx.kernalContext().closure().callLocalSafe(
new GPC<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -381,6 +381,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
},
true);
+ }
}
/**
@@ -502,8 +503,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (intercept || !F.isEmpty(e.entryProcessors()))
e.cached().unswap(true, false);
- // TODO IGNITE-51 (do not need convert to CacheObject to pass to store?).
- GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(e, false);
+ IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
GridCacheContext cacheCtx = e.context();
@@ -716,7 +716,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);
- GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(txEntry,
+ IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
true);
// For near local transactions we must record DHT version
@@ -740,7 +740,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
GridCacheOperation op = res.get1();
CacheObject val = res.get2();
- byte[] valBytes = res.get3();
// Deal with conflicts.
GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
@@ -814,7 +813,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
eventNodeId(),
txEntry.nodeId(),
val,
- valBytes,
false,
false,
txEntry.ttl(),
@@ -834,7 +832,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
eventNodeId(),
nodeId,
val,
- valBytes,
false,
false,
txEntry.ttl(),
@@ -1101,7 +1098,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Checks if there is a cached or swapped value for
- * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param keys Key to enlist.
@@ -1112,20 +1109,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
* @param deserializePortable Deserialize portable flag.
* @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
* @throws IgniteCheckedException If failed.
* @return Enlisted keys.
*/
@SuppressWarnings({"RedundantTypeArguments"})
private <K, V> Collection<KeyCacheObject> enlistRead(
final GridCacheContext cacheCtx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
@Nullable GridCacheEntryEx cached,
@Nullable ExpiryPolicy expiryPlc,
Map<K, V> map,
Map<KeyCacheObject, GridCacheVersion> missed,
int keysCnt,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean keepCacheObjects
) throws IgniteCheckedException {
assert !F.isEmpty(keys);
assert keysCnt == keys.size();
@@ -1144,16 +1143,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// In this loop we cover only read-committed or optimistic transactions.
// Transactions that are pessimistic and not read-committed are covered
// outside of this loop.
- for (K key : keys) {
- if (key == null)
- continue;
-
+ for (KeyCacheObject key : keys) {
if (pessimistic() && !readCommitted() && !skipVals)
addActiveCache(cacheCtx);
- KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
-
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+ IgniteTxKey txKey = cacheCtx.txKey(key);
// Check write map (always check writes first).
IgniteTxEntry txEntry = entry(txKey);
@@ -1167,14 +1161,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
- if (val != null) {
- Object val0 = val.value(cacheCtx, true);
-
- if (cacheCtx.portableEnabled())
- val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
- map.put(key, (V)CU.skipValue(val0, skipVals));
- }
+ if (val != null)
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, false);
}
else {
assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
@@ -1205,15 +1193,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
- Object val0 = val.value(cacheCtx, !skipVals);
-
- if (cacheCtx.portableEnabled())
- val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
- map.put(key, (V)CU.skipValue(val0, skipVals));
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
else
- missed.put(cacheKey, txEntry.cached().version());
+ missed.put(key, txEntry.cached().version());
break;
}
@@ -1233,10 +1222,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// First time access within transaction.
else {
if (lockKeys == null && !skipVals)
- lockKeys = single ? Collections.singleton(cacheKey) : new ArrayList<KeyCacheObject>(keysCnt);
+ lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
if (!single && !skipVals)
- lockKeys.add(cacheKey);
+ lockKeys.add(key);
while (true) {
GridCacheEntryEx entry;
@@ -1273,19 +1262,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
accessPlc);
if (val != null) {
- Object val0 = val.value(cacheCtx, false);
-
- if (cacheCtx.portableEnabled())
- val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
- map.put(key, (V)CU.skipValue(val0, skipVals));
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
else
- missed.put(cacheKey, ver);
+ missed.put(key, ver);
}
else
// We must wait for the lock in pessimistic mode.
- missed.put(cacheKey, ver);
+ missed.put(key, ver);
if (!readCommitted() && !skipVals) {
txEntry = addEntry(READ,
@@ -1392,22 +1382,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Loads all missed keys for
- * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param map Return map.
* @param missedMap Missed keys.
* @param redos Keys to retry.
* @param deserializePortable Deserialize portable flag.
+ * @param keepCacheObjects Keep cache objects flag.
* @return Loaded key-value pairs.
*/
private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
final GridCacheContext cacheCtx,
final Map<K, V> map,
final Map<KeyCacheObject, GridCacheVersion> missedMap,
- @Nullable final Collection<K> redos,
+ @Nullable final Collection<KeyCacheObject> redos,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ final boolean keepCacheObjects
) {
assert redos != null || pessimistic();
@@ -1443,7 +1435,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
CacheObject cacheVal = cacheCtx.toCacheObject(val);
- Object visibleVal = val;
+ CacheObject visibleVal = cacheVal;
IgniteTxKey txKey = cacheCtx.txKey(key);
@@ -1506,16 +1498,30 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (readCommitted() || groupLock() || skipVals) {
cacheCtx.evicts().touch(e, topologyVersion());
- if (visibleVal != null)
- map.put(key.<K>value(cacheCtx, false), (V)CU.skipValue(visibleVal, skipVals));
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
}
else {
assert txEntry != null;
txEntry.setAndMarkValid(cacheVal);
- if (visibleVal != null)
- map.put(key.<K>value(cacheCtx, false), (V)visibleVal);
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
}
loaded.add(key);
@@ -1575,10 +1581,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
@Nullable GridCacheEntryEx cached,
final boolean deserializePortable,
- final boolean skipVals) {
+ final boolean skipVals,
+ final boolean keepCacheObjects) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -1595,7 +1602,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
- GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+ GridCacheProjectionImpl prj = cacheCtx.projectionPerCall();
ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
@@ -1607,7 +1614,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
missed,
keysCnt,
deserializePortable,
- skipVals);
+ skipVals,
+ keepCacheObjects);
if (single && missed.isEmpty())
return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
@@ -1636,7 +1644,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// Load keys only after the locks have been acquired.
for (KeyCacheObject cacheKey : lockKeys) {
- K keyVal = cacheKey.<K>value(cacheCtx, false);
+ K keyVal = (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx, false));
if (retMap.containsKey(keyVal))
// We already have a return value.
@@ -1682,10 +1690,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()))
val0 = txEntry.applyEntryProcessors(val0);
- if (cacheCtx.portableEnabled())
- val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
- retMap.put(keyVal, (V)val0);
+ cacheCtx.addResult(retMap,
+ cacheKey,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
// Even though we bring the value back from lock acquisition,
@@ -1718,8 +1729,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
- if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
- return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, skipVals);
+ if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) {
+ return checkMissed(cacheCtx,
+ retMap,
+ missed,
+ null,
+ deserializePortable,
+ skipVals,
+ keepCacheObjects);
+ }
return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
}
@@ -1764,12 +1782,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else {
assert optimistic() || readCommitted() || groupLock() || skipVals;
- final Collection<K> redos = new ArrayList<>();
+ final Collection<KeyCacheObject> redos = new ArrayList<>();
if (!missed.isEmpty()) {
if (!readCommitted())
for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
- K keyVal = it.next().value(cacheCtx, false);
+ KeyCacheObject cacheKey = it.next();
+
+ K keyVal = (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx, false));
if (retMap.containsKey(keyVal))
it.remove();
@@ -1781,7 +1801,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return new GridEmbeddedFuture<>(
cctx.kernalContext(),
// First future.
- checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals),
+ checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals, keepCacheObjects),
// Closure that returns another future, based on result from first.
new PMC<Map<K, V>>() {
@Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) {
@@ -1793,27 +1813,37 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Starting to future-recursively get values for keys: " + redos);
// Future recursion.
- return getAllAsync(cacheCtx, redos, null, deserializePortable, skipVals);
+ return getAllAsync(cacheCtx,
+ redos,
+ null,
+ deserializePortable,
+ skipVals,
+ true);
}
},
// Finalize.
new FinishClosure<Map<K, V>>() {
@Override Map<K, V> finish(Map<K, V> loaded) {
for (Map.Entry<K, V> entry : loaded.entrySet()) {
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(entry.getKey());
+ KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey();
IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey));
- V val = entry.getValue();
+ CacheObject val = (CacheObject)entry.getValue();
if (!readCommitted())
- txEntry.readValue(cacheCtx.toCacheObject(val));
+ txEntry.readValue(val);
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
- retMap.put(entry.getKey(), val);
+ cacheCtx.addResult(retMap,
+ cacheKey,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
return retMap;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0391e58..43737e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -69,14 +69,17 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @param cached Cached entry if this method is called from entry wrapper.
* Cached entry is passed if and only if there is only one key in collection of keys.
* @param deserializePortable Deserialize portable flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects
* @return Future for this get.
*/
public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
GridCacheContext cacheCtx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
@Nullable GridCacheEntryEx cached,
boolean deserializePortable,
- boolean skipVals);
+ boolean skipVals,
+ boolean keepCacheObjects);
/**
* @param cacheCtx Cache context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index e7c401c..f9cb4cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -367,8 +367,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
// TODO IGNITE-51.
Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() {
@Override public IgniteDataLoaderEntry apply(Entry<K, V> e) {
- KeyCacheObject key = cacheObjProc.toCacheKeyObject(null, e.getKey());
- CacheObject val = cacheObjProc.toCacheObject(null, e.getValue());
+ KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey());
+ CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), null);
return new IgniteDataLoaderEntry(key, val);
}
@@ -461,8 +461,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
@Override public IgniteFuture<?> addData(K key, V val) {
A.notNull(key, "key");
- KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(null, key);
- CacheObject val0 = cacheObjProc.toCacheObject(null, val);
+ KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key);
+ CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, null);
return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0)));
}
@@ -1332,7 +1332,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
entry.unswap(true, false);
entry.initialValue(e.getValue(),
- null,
ver,
CU.TTL_ETERNAL,
CU.EXPIRE_TIME_ETERNAL,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
index 880445f..5da5ee9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
@@ -158,7 +158,7 @@ public interface GridPortableProcessor extends GridProcessor {
* @param obj Object.
* @return Cache object.
*/
- @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj);
+ @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes);
/**
* @param obj Key value.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
index 8738bf4..414c292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
@@ -143,10 +143,13 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter {
}
/** {@inheritDoc} */
- @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj) {
- if (obj == null || obj instanceof CacheObject)
+ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes) {
+ if ((obj == null && bytes == null) || obj instanceof CacheObject)
return (CacheObject)obj;
+ if (bytes != null)
+ return new CacheObjectImpl(obj, bytes);
+
return new UserCacheObjectImpl(obj);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 1aa7c96..b53386c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -1460,8 +1460,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
m.put(null, 2);
GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
cache.putAll(m);
return null;
@@ -1480,8 +1479,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
m.put("key4", null);
GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
cache.putAll(m);
return null;
@@ -1696,7 +1694,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
IgniteFuture<Integer> fut1 = cacheAsync.future();
- assert fut1.get() == null;
+ assertNull(fut1.get());
assertEquals((Integer)1, cache.get("key"));
cacheAsync.getAndPutIfAbsent("key", 2);
@@ -3934,7 +3932,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
CacheAffinity<Integer> aff = ignite(0).affinity(null);
boolean near = cfg.getDistributionMode() == CacheDistributionMode.NEAR_PARTITIONED ||
- cfg.getDistributionMode() == CacheDistributionMode.NEAR_ONLY;
+ cfg.getDistributionMode() == CacheDistributionMode.NEAR_ONLY;
ClusterNode locNode = ignite(0).cluster().localNode();
@@ -3964,7 +3962,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals(i, getObj1.val);
- if (loc)
+ if (loc && !offHeapValues())
assertSame("Same expected [key=" + i + ", primary=" + primary + ", backup=" + backup + ']',
putObj1,
getObj1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6cdd97e..40691d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -453,7 +453,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID evtNodeId,
UUID affNodeId,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
long ttl,
@@ -467,7 +466,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
- @Override public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+ @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
@@ -492,7 +491,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID affNodeId,
GridCacheOperation op,
@Nullable Object val,
- @Nullable byte[] valBytes,
@Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@@ -653,7 +651,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
/** @inheritDoc */
@Override public boolean initialValue(CacheObject val,
- @Nullable byte[] valBytes,
GridCacheVersion ver,
long ttl,
long expireTime,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
index 3336798..4b5a260 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
@@ -38,6 +38,7 @@ public class GridCacheReplicatedFullApiSelfTest extends GridCacheAbstractFullApi
return PARTITIONED_ONLY;
}
+ /** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration c = super.getConfiguration(gridName);
[3/6] incubator-ignite git commit: # ignite-51
Posted by sb...@apache.org.
# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a040311d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a040311d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a040311d
Branch: refs/heads/ignite-51
Commit: a040311d2eb88d4d3e49aa9c885007c85a6a442b
Parents: bdb0f55
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 10:13:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:03:57 2015 +0300
----------------------------------------------------------------------
.../processors/cache/CacheObjectAdapter.java | 1 +
.../processors/cache/GridCacheAdapter.java | 46 ++---
.../processors/cache/GridCacheContext.java | 45 ++--
.../processors/cache/GridCacheEntryEx.java | 15 +-
.../processors/cache/GridCacheMapEntry.java | 206 +++++++++----------
.../processors/cache/GridCacheMessage.java | 28 ++-
.../processors/cache/GridCacheSwapManager.java | 44 ++--
.../processors/cache/KeyCacheObjectImpl.java | 5 +-
.../cache/UserKeyCacheObjectImpl.java | 31 ++-
.../GridDistributedCacheAdapter.java | 14 +-
.../GridDistributedTxPrepareRequest.java | 110 +++++++---
.../GridDistributedTxRemoteAdapter.java | 25 ++-
.../distributed/dht/GridDhtCacheAdapter.java | 2 +-
.../distributed/dht/GridDhtCacheEntry.java | 30 ++-
.../cache/distributed/dht/GridDhtGetFuture.java | 29 ++-
.../distributed/dht/GridDhtLockFuture.java | 2 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 14 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 38 +---
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
.../dht/GridDhtTxPrepareRequest.java | 50 ++---
.../dht/GridPartitionedGetFuture.java | 20 +-
.../dht/atomic/GridDhtAtomicCache.java | 42 ++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 13 +-
.../dht/colocated/GridDhtColocatedCache.java | 68 +++---
.../colocated/GridDhtColocatedLockFuture.java | 16 +-
.../colocated/GridDhtDetachedCacheEntry.java | 12 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 1 -
.../preloader/GridDhtPartitionDemandPool.java | 1 -
.../distributed/near/GridNearAtomicCache.java | 6 +-
.../distributed/near/GridNearCacheAdapter.java | 2 +-
.../distributed/near/GridNearCacheEntry.java | 29 +--
.../distributed/near/GridNearLockFuture.java | 4 +-
.../near/GridNearTransactionalCache.java | 13 +-
.../cache/distributed/near/GridNearTxLocal.java | 126 ++++++------
.../near/GridNearTxPrepareFuture.java | 4 +-
.../near/GridNearTxPrepareRequest.java | 42 ++--
.../near/GridNearTxPrepareResponse.java | 22 +-
.../processors/cache/local/GridLocalCache.java | 10 +-
.../cache/local/GridLocalLockFuture.java | 2 +-
.../local/atomic/GridLocalAtomicCache.java | 14 +-
.../cache/transactions/IgniteTxAdapter.java | 18 +-
.../cache/transactions/IgniteTxEntry.java | 3 +-
.../transactions/IgniteTxLocalAdapter.java | 170 ++++++++-------
.../cache/transactions/IgniteTxLocalEx.java | 7 +-
.../dataload/IgniteDataLoaderImpl.java | 9 +-
.../portable/GridPortableProcessor.java | 2 +-
.../portable/os/GridOsPortableProcessor.java | 7 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 12 +-
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../GridCacheReplicatedFullApiSelfTest.java | 1 +
50 files changed, 725 insertions(+), 695 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 8edc6c8..5cf3521 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -43,6 +43,7 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
}
/**
+ * @param ctx Context.
* @return {@code True} need to copy value returned to user.
*/
protected boolean needCopy(GridCacheContext ctx) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7cb0b1f..27d0065 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -558,7 +558,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @return Locks future.
*/
public abstract IgniteInternalFuture<Boolean> txLockAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
@@ -2233,16 +2233,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
if (keyCheck)
validateCacheKeys(keys);
- Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
- @Override public KeyCacheObject apply(K key) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- return ctx.toCacheKeyObject(key);
- }
- });
-
- return getAllAsync0(keys0,
+ return getAllAsync0(ctx.cacheKeysView(keys),
readThrough,
checkTx,
subjId,
@@ -2263,7 +2254,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @param expiry Expiry policy.
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects
- * @return
+ * @return Future.
*/
public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
boolean readThrough,
@@ -2502,13 +2493,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
}
}
else {
- return null;
-// TODO IGNITE-51.
-// return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-// @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
-// return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, cached0, deserializePortable, skipVals));
-// }
-// });
+ return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
+ @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
+ return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false);
+ }
+ });
}
}
@@ -4166,7 +4155,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
GridCacheEntryEx entry = entryEx(key, false);
try {
- entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
+ entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
replicate ? DR_LOAD : DR_NONE);
}
catch (IgniteCheckedException e) {
@@ -4615,8 +4604,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
GridCacheEntryEx entry = peekEx(cacheKey);
try {
- if (entry == null || entry.obsolete() || entry.isNewLocked())
+ if (entry == null || entry.obsolete() || entry.isNewLocked()) {
+ if (entry != null)
+ cacheKey = entry.key();
+
unswap.add(cacheKey);
+ }
}
catch (GridCacheEntryRemovedException ignored) {
// No-op.
@@ -5856,7 +5849,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
private final boolean single;
/** Keys. */
- private final Collection<? extends K> keys;
+ private final Collection<?> keys;
/**
* @param key Key.
@@ -5870,7 +5863,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
/**
* @param keys Keys involved.
*/
- protected AsyncOp(Collection<? extends K> keys) {
+ protected AsyncOp(Collection<?> keys) {
this.keys = keys;
single = keys.size() == 1;
@@ -5884,13 +5877,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
}
/**
- * @return Keys.
- */
- Collection<? extends K> keys() {
- return keys;
- }
-
- /**
* @param tx Transaction.
* @return Operation return value.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b4e3df..3cbe8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1322,26 +1322,6 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @param f Target future.
- * @return Wrapped future that is aware of cloning behaviour.
- */
- public IgniteInternalFuture<Map<K, V>> wrapCloneMap(IgniteInternalFuture<Map<K, V>> f) {
- if (!hasFlag(CLONE))
- return f;
-
- return f.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() {
- @Override public Map<K, V> applyx(IgniteInternalFuture<Map<K, V>> f) throws IgniteCheckedException {
- Map<K, V> map = new GridLeanMap<>();
-
- for (Map.Entry<K, V> e : f.get().entrySet())
- map.put(e.getKey(), cloneValue(e.getValue()));
-
- return map;
- }
- });
- }
-
- /**
* Creates Runnable that can be executed safely in a different thread inheriting
* the same thread local projection as for the current thread. If no projection is
* set for current thread then there's no need to create new object and method simply
@@ -1780,7 +1760,15 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Cache object.
*/
@Nullable public CacheObject toCacheObject(@Nullable Object obj) {
- return portable().toCacheObject(cacheObjCtx, obj);
+ return portable().toCacheObject(cacheObjCtx, obj, null);
+ }
+
+ /**
+ * @param obj Object.
+ * @return Cache object.
+ */
+ @Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) {
+ return portable().toCacheObject(cacheObjCtx, obj, bytes);
}
/**
@@ -1946,6 +1934,21 @@ public class GridCacheContext<K, V> implements Externalizable {
mgr.printMemoryStats();
}
+ /**
+ * @param keys Keys.
+ * @return Co
+ */
+ public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys) {
+ return F.viewReadOnly(keys, new C1<Object, KeyCacheObject>() {
+ @Override public KeyCacheObject apply(Object key) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ return toCacheKeyObject(key);
+ }
+ });
+ };
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, gridName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 8ed070e..30df242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -244,7 +244,6 @@ public interface GridCacheEntryEx {
* @return Swap entry if this entry was marked obsolete, {@code null} if entry was not evicted.
* @throws IgniteCheckedException If failed.
*/
- // TODO IGNITE-51
public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException;
/**
@@ -333,7 +332,6 @@ public interface GridCacheEntryEx {
* @param evtNodeId ID of node responsible for this change.
* @param affNodeId Partitioned node iD.
* @param val Value to set.
- * @param valBytes Value bytes to set.
* @param writeThrough If {@code true} then persist to storage.
* @param retval {@code True} if value should be returned (and unmarshalled if needed).
* @param ttl Time to live.
@@ -356,7 +354,6 @@ public interface GridCacheEntryEx {
UUID evtNodeId,
UUID affNodeId,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
long ttl,
@@ -412,7 +409,6 @@ public interface GridCacheEntryEx {
* @param affNodeId Affinity node ID.
* @param op Update operation.
* @param val Value. Type depends on operation.
- * @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
* @param invokeArgs Optional arguments for entry processor.
* @param writeThrough Write through flag.
* @param retval Return value flag.
@@ -445,7 +441,6 @@ public interface GridCacheEntryEx {
UUID affNodeId,
GridCacheOperation op,
@Nullable Object val,
- @Nullable byte[] valBytes,
@Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@@ -485,7 +480,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If update failed.
* @throws GridCacheEntryRemovedException If entry is obsolete.
*/
- public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+ public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
@@ -501,7 +496,6 @@ public interface GridCacheEntryEx {
String taskName
) throws IgniteCheckedException, GridCacheEntryRemovedException;
-
/**
* Marks entry as obsolete and, if possible or required, removes it
* from swap storage.
@@ -662,7 +656,6 @@ public interface GridCacheEntryEx {
* Sets new value if current version is <tt>0</tt>
*
* @param val New value.
- * @param valBytes Value bytes.
* @param ver Version to use.
* @param ttl Time to live.
* @param expireTime Expiration time.
@@ -674,7 +667,6 @@ public interface GridCacheEntryEx {
* @throws GridCacheEntryRemovedException If entry was removed.
*/
public boolean initialValue(CacheObject val,
- @Nullable byte[] valBytes,
GridCacheVersion ver,
long ttl,
long expireTime,
@@ -692,7 +684,6 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- // TODO IGNITE-51
public boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped)
throws IgniteCheckedException, GridCacheEntryRemovedException;
@@ -715,7 +706,9 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If index could not be updated.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public boolean versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer)
+ public boolean versionedValue(CacheObject val,
+ @Nullable GridCacheVersion curVer,
+ @Nullable GridCacheVersion newVer)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74f337b..77a7343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -120,7 +120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
private final int hash;
/** Off-heap value pointer. */
- private long valPtr;
+ protected long valPtr;
/** Extras */
@GridToStringInclude
@@ -162,7 +162,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
synchronized (this) {
- value(val, null);
+ value(val);
}
next(hdrId, next);
@@ -181,9 +181,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* Sets entry value. If off-heap value storage is enabled, will serialize value to off-heap.
*
* @param val Value to store.
- * @param valBytes Value bytes to store.
*/
- protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) {
+ protected void value(@Nullable CacheObject val) {
assert Thread.holdsLock(this);
// In case we deal with IGFS cache, count updated data
@@ -205,17 +204,18 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
else {
try {
if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
- if (val != null || valBytes != null) {
- if (val == null)
- val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
+ Object val0 = null;
- if (val != null)
- cctx.gridDeploy().deploy(val.getClass(), val.getClass().getClassLoader());
+ if (val != null) {
+ val0 = val.value(cctx, false);
+
+ if (val0 != null)
+ cctx.gridDeploy().deploy(val0.getClass(), val0.getClass().getClassLoader());
}
- if (U.p2pLoader(val)) {
+ if (U.p2pLoader(val0)) {
cctx.deploy().addDeploymentContext(
- new GridDeploymentInfoBean((GridDeploymentInfo)val.getClass().getClassLoader()));
+ new GridDeploymentInfoBean((GridDeploymentInfo)val0.getClass().getClassLoader()));
}
}
@@ -527,7 +527,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
// Set unswapped value.
- update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version());
+ update(val, e.expireTime(), e.ttl(), e.version());
// Must update valPtr again since update() will reset it.
if (cctx.offheapTiered() && e.offheapPointer() > 0)
@@ -598,7 +598,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/**
* @return Value bytes and flag indicating whether value is byte array.
*/
- private IgniteBiTuple<byte[], Boolean> valueBytes0() {
+ protected IgniteBiTuple<byte[], Boolean> valueBytes0() {
if (valPtr != 0) {
assert isOffHeapValuesOnly() || cctx.offheapTiered();
@@ -784,7 +784,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (expired) {
expiredVal = val;
- value(null, null);
+ value(null);
}
if (old == null && !hasOldBytes) {
@@ -879,12 +879,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (loadedFromStore)
// Update indexes before actual write to entry.
- updateIndex(ret, null, expTime, nextVer, prevVal);
+ updateIndex(ret, expTime, nextVer, prevVal);
boolean hadValPtr = valPtr != 0;
// Don't change version for read-through.
- update(ret, null, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer);
if (hadValPtr && cctx.offheapTiered())
cctx.swap().removeOffheap(key);
@@ -956,7 +956,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Update indexes.
if (ret != null) {
- updateIndex(ret, null, expTime, nextVer, old);
+ updateIndex(ret, expTime, nextVer, old);
if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked())
deletedUnlocked(false);
@@ -968,7 +968,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
deletedUnlocked(true);
}
- update(ret, null, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer);
touch = true;
@@ -1000,7 +1000,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
UUID evtNodeId,
UUID affNodeId,
CacheObject val,
- @Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
long ttl,
@@ -1065,11 +1064,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (interceptorVal == null)
return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old));
- else if (interceptorVal != val0) {
+ else if (interceptorVal != val0)
val = cctx.toCacheKeyObject(cctx.unwrapTemporary(interceptorVal));
-
- valBytes = null;
- }
}
// Determine new ttl and expire time.
@@ -1097,16 +1093,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
- if (val != null || valBytes != null) {
- updateIndex(val, valBytes, expireTime, newVer, old);
+ if (val != null) {
+ updateIndex(val, expireTime, newVer, old);
if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false);
}
- update(val, valBytes, expireTime, ttl, newVer);
+ update(val, expireTime, ttl, newVer);
- drReplicate(drType, val, valBytes, newVer);
+ drReplicate(drType, val, newVer);
recordNodeId(affNodeId);
@@ -1231,7 +1227,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
boolean hadValPtr = valPtr != 0;
- update(null, null, 0, 0, newVer);
+ update(null, 0, 0, newVer);
if (cctx.offheapTiered() && hadValPtr) {
boolean rmv = cctx.swap().removeOffheap(key);
@@ -1254,7 +1250,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
}
- drReplicate(drType, null, null, newVer);
+ drReplicate(drType, null, newVer);
if (metrics && cctx.cache().configuration().isStatisticsEnabled())
cctx.cache().metrics0().onRemove();
@@ -1343,7 +1339,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+ @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
@@ -1412,11 +1408,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx);
if (old != null)
- updateIndex(old, null, expireTime, ver, null);
+ updateIndex(old, expireTime, ver, null);
else
clearIndex(null);
- update(old, null, expireTime, ttl, ver);
+ update(old, expireTime, ttl, ver);
}
// Apply metrics.
@@ -1434,7 +1430,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked())
updateTtl(expiryPlc);
- return new T3<>(false, retval ? old : null, null);
+ return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null);
}
}
@@ -1442,6 +1438,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
CacheObject updated;
+ Object key0 = null;
+ Object updated0 = null;
+
// Calculate new value.
if (op == GridCacheOperation.TRANSFORM) {
transformCloClsName = writeObj.getClass().getName();
@@ -1450,16 +1449,19 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
assert entryProcessor != null;
- // TODO IGNITE-51.
- CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx,
- key.value(cctx, false),
- old.value(cctx, false));
+ key0 = key.value(cctx, false);
+ old0 = value(old0, old, false);
+
+ CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key0, old0);
try {
Object computed = entryProcessor.process(entry, invokeArgs);
- if (entry.modified())
- updated = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
+ if (entry.modified()) {
+ updated0 = cctx.unwrapTemporary(entry.getValue());
+
+ updated = cctx.toCacheObject(updated0);
+ }
else
updated = old;
@@ -1484,19 +1486,26 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
if (intercept) {
-// TODO IGNITE-51.
-// if (op == GridCacheOperation.UPDATE) {
-// updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
-//
-// if (updated == null)
-// return new GridTuple3<>(false, cctx.<V>unwrapTemporary(old), invokeRes);
-// }
-// else {
-// interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
-//
-// if (cctx.cancelRemove(interceptorRes))
-// return new GridTuple3<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()), invokeRes);
-// }
+ if (op == GridCacheOperation.UPDATE) {
+ key0 = value(key0, key, false);
+ updated0 = value(updated0, updated, false);
+ old0 = value(old0, old, false);
+
+ Object interceptorVal
+ = cctx.config().getInterceptor().onBeforePut(key0, old0, updated0);
+
+ if (interceptorVal == null)
+ return new GridTuple3<>(false, cctx.unwrapTemporary(old0), invokeRes);
+ }
+ else {
+ key0 = value(key0, key, false);
+ old0 = value(old0, old, false);
+
+ interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0);
+
+ if (cctx.cancelRemove(interceptorRes))
+ return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes);
+ }
}
boolean hadVal = hasValueUnlocked();
@@ -1535,11 +1544,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
- updateIndex(updated, null, expireTime, ver, old);
+ updateIndex(updated, expireTime, ver, old);
assert ttl != CU.TTL_ZERO;
- update(updated, null, expireTime, ttl, ver);
+ update(updated, expireTime, ttl, ver);
if (evt) {
CacheObject evtOld = null;
@@ -1571,7 +1580,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// in load methods without actually holding entry lock.
clearIndex(old);
- update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
if (evt) {
CacheObject evtOld = null;
@@ -1603,15 +1612,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (intercept) {
if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(key, val);
+ cctx.config().getInterceptor().onAfterPut(key0, val);
else
- cctx.config().getInterceptor().onAfterRemove(key, old);
+ cctx.config().getInterceptor().onAfterRemove(key0, old0);
}
}
- // TODO IGNITE-51.
return new GridTuple3<>(res,
- cctx.<CacheObject>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old),
+ cctx.unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : CU.value(old, cctx, false)),
invokeRes);
}
@@ -1623,7 +1631,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
UUID affNodeId,
GridCacheOperation op,
@Nullable Object writeObj,
- @Nullable byte[] valBytes,
@Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@@ -1814,7 +1821,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
(op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
old0 = readThrough(null, key, false, subjId, taskName);
- oldVal = cctx.toCacheObject(oldVal);
+ oldVal = cctx.toCacheObject(old0);
readThrough = true;
@@ -1837,11 +1844,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
if (oldVal != null)
- updateIndex(oldVal, null, initExpireTime, ver, null);
+ updateIndex(oldVal, initExpireTime, ver, null);
else
clearIndex(null);
- update(oldVal, null, initExpireTime, initTtl, ver);
+ update(oldVal, initExpireTime, initTtl, ver);
if (deletedUnlocked() && oldVal != null && !isInternal())
deletedUnlocked(false);
@@ -1856,7 +1863,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Check filter inside of synchronization.
if (!F.isEmptyOrNulls(filter)) {
- // TODO IGNITE-51 can get key/value only once.
boolean pass = cctx.isAll(wrapFilterLocked(), filter);
if (!pass) {
@@ -1903,15 +1909,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (computed != null)
invokeRes = new CacheInvokeDirectResult(key,
cctx.toCacheObject(cctx.unwrapTemporary(computed)));
-
- valBytes = null;
}
catch (Exception e) {
invokeRes = new CacheInvokeDirectResult(key, e);
updated = oldVal;
-
- valBytes = oldValBytes.getIfMarshaled();
}
if (!entry.modified()) {
@@ -1981,7 +1983,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
newExpireTime = CU.EXPIRE_TIME_ETERNAL;
updated = null;
- valBytes = null;
}
else {
newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
@@ -2030,10 +2031,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
null,
null,
false);
- else if (interceptorVal != updated0) {
+ else if (interceptorVal != updated0)
updated = cctx.toCacheObject(cctx.unwrapTemporary(updated0));
- valBytes = null;
- }
}
// Try write-through.
@@ -2061,11 +2060,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
- updateIndex(updated, valBytes, newExpireTime, newVer, oldVal);
+ updateIndex(updated, newExpireTime, newVer, oldVal);
- update(updated, valBytes, newExpireTime, newTtl, newVer);
+ update(updated, newExpireTime, newTtl, newVer);
- drReplicate(drType, updated, valBytes, newVer);
+ drReplicate(drType, updated, newVer);
recordNodeId(affNodeId);
@@ -2140,7 +2139,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
boolean hasValPtr = valPtr != 0;
// Clear value on backup. Entry will be removed from cache when it got evicted from queue.
- update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
assert newSysTtl == CU.TTL_NOT_CHANGED;
assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
@@ -2155,7 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
recordNodeId(affNodeId);
- drReplicate(drType, null, null, newVer);
+ drReplicate(drType, null, newVer);
if (evt) {
CacheObject evtOld = null;
@@ -2304,11 +2303,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
*
* @param drType DR type.
* @param val Value.
- * @param valBytes Value bytes.
* @param ver Version.
* @throws IgniteCheckedException In case of exception.
*/
- private void drReplicate(GridDrType drType, @Nullable CacheObject val, @Nullable byte[] valBytes, GridCacheVersion ver)
+ private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException {
// TODO IGNITE-51.
// if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
@@ -2456,7 +2454,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver);
deletedUnlocked(true);
@@ -2528,7 +2526,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
obsoleteVersionExtras(obsoleteVer);
if (clear)
- value(null, null);
+ value(null);
}
return obsoleteVer != null;
@@ -2562,7 +2560,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (curVer == null || ver.equals(curVer)) {
CacheObject val = saveValueForIndexUnlocked();
- value(null, null);
+ value(null);
ver = newVer;
@@ -2666,13 +2664,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/**
*
* @param val New value.
- * @param valBytes New value bytes.
* @param expireTime Expiration time.
* @param ttl Time to live.
* @param ver Update version.
*/
- protected final void update(@Nullable CacheObject val, @Nullable byte[] valBytes, long expireTime, long ttl,
- GridCacheVersion ver) {
+ protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) {
assert ver != null;
assert Thread.holdsLock(this);
assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
@@ -2682,7 +2678,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
cctx.ttl().removeTrackedEntry(this);
- value(val, valBytes);
+ value(val);
ttlAndExpireTimeExtras(ttl, expireTime);
@@ -2986,9 +2982,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
- updateIndex(val, null, expireTime, nextVer, old);
+ updateIndex(val, expireTime, nextVer, old);
- update(val, null, expireTime, ttlExtras(), nextVer);
+ update(val, expireTime, ttlExtras(), nextVer);
}
if (log.isDebugEnabled())
@@ -3238,7 +3234,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
@Override public synchronized CacheObject rawPut(CacheObject val, long ttl) {
CacheObject old = this.val;
- update(val, null, CU.toExpireTime(ttl), ttl, nextVersion());
+ update(val, CU.toExpireTime(ttl), ttl, nextVersion());
return old;
}
@@ -3247,7 +3243,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
@SuppressWarnings({"RedundantTypeArguments"})
@Override public boolean initialValue(
CacheObject val,
- byte[] valBytes,
GridCacheVersion ver,
long ttl,
long expireTime,
@@ -3267,15 +3262,15 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
- if (val != null || valBytes != null)
- updateIndex(val, valBytes, expTime, ver, null);
+ if (val != null)
+ updateIndex(val, expTime, ver, null);
// Version does not change for load ops.
- update(val, valBytes, expTime, ttl, ver);
+ update(val, expTime, ttl, ver);
boolean skipQryNtf = false;
- if (val == null && valBytes == null) {
+ if (val == null) {
skipQryNtf = true;
if (cctx.deferredDelete() && !isInternal()) {
@@ -3287,7 +3282,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
else if (deletedUnlocked())
deletedUnlocked(false);
- drReplicate(drType, val, valBytes, ver);
+ drReplicate(drType, val, ver);
if (!skipQryNtf) {
if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
@@ -3322,7 +3317,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Version does not change for load ops.
update(val,
- unswapped.valueBytes(),
unswapped.expireTime(),
unswapped.ttl(),
unswapped.version()
@@ -3365,14 +3359,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
if (val != null) {
- updateIndex(val, null, expTime, newVer, old);
+ updateIndex(val, expTime, newVer, old);
if (deletedUnlocked())
deletedUnlocked(false);
}
// Version does not change for load ops.
- update(val, null, expTime, ttl, newVer);
+ update(val, expTime, ttl, newVer);
}
return true;
@@ -3584,7 +3578,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (expired) {
if (cctx.deferredDelete() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver);
deletedUnlocked(true);
@@ -3785,25 +3779,23 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* Updates cache index.
*
* @param val Value.
- * @param valBytes Value bytes.
* @param expireTime Expire time.
* @param ver New entry version.
* @param prevVal Previous value.
* @throws IgniteCheckedException If update failed.
*/
protected void updateIndex(@Nullable CacheObject val,
- @Nullable byte[] valBytes,
long expireTime,
GridCacheVersion ver,
@Nullable CacheObject prevVal) throws IgniteCheckedException {
assert Thread.holdsLock(this);
- assert val != null || valBytes != null : "null values in update index for key: " + key;
+ assert val != null : "null values in update index for key: " + key;
try {
GridCacheQueryManager qryMgr = cctx.queries();
if (qryMgr != null)
- qryMgr.store(key, null, val, valBytes, ver, expireTime);
+ qryMgr.store(key, null, val, null, ver, expireTime);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3920,7 +3912,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
clearIndex(prev);
// Nullify value after swap.
- value(null, null);
+ value(null);
marked = true;
@@ -3963,7 +3955,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
clearIndex(prevVal);
// Nullify value after swap.
- value(null, null);
+ value(null);
marked = true;
@@ -4018,7 +4010,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
valClsLdrId);
}
- value(null, null);
+ value(null);
}
}
catch (GridCacheEntryRemovedException ignored) {
@@ -4491,7 +4483,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
if (detached())
- return rawGet().value(cctx, false);
+ return CU.value(rawGet(), cctx, false);
for (;;) {
GridCacheEntryEx e = cctx.cache().peekEx(key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index a4d7384..fffa35f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -587,8 +587,12 @@ public abstract class GridCacheMessage implements Message {
int size = col.size();
- for (int i = 0 ; i < size; i++)
- col.get(i).prepareMarshal(ctx.cacheObjectContext());
+ for (int i = 0 ; i < size; i++) {
+ CacheObject obj = col.get(i);
+
+ if (obj != null)
+ obj.prepareMarshal(ctx.cacheObjectContext());
+ }
}
/**
@@ -601,8 +605,10 @@ public abstract class GridCacheMessage implements Message {
if (col == null)
return;
- for (CacheObject obj : col)
- obj.prepareMarshal(ctx.cacheObjectContext());
+ for (CacheObject obj : col) {
+ if (obj != null)
+ obj.prepareMarshal(ctx.cacheObjectContext());
+ }
}
/**
@@ -622,8 +628,12 @@ public abstract class GridCacheMessage implements Message {
int size = col.size();
- for (int i = 0 ; i < size; i++)
- col.get(i).finishUnmarshal(ctx, ldr);
+ for (int i = 0 ; i < size; i++) {
+ CacheObject obj = col.get(i);
+
+ if (obj != null)
+ obj.finishUnmarshal(ctx, ldr);
+ }
}
/**
@@ -640,8 +650,10 @@ public abstract class GridCacheMessage implements Message {
if (col == null)
return;
- for (CacheObject obj : col)
- obj.finishUnmarshal(ctx, ldr);
+ for (CacheObject obj : col) {
+ if (obj != null)
+ obj.finishUnmarshal(ctx, ldr);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 4b21f8c..012d393 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -511,7 +511,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
// First check off-heap store.
if (readOffheap && offheapEnabled) {
- byte[] bytes = offheap.get(spaceName, part, key, key.valueBytes(cctx));
+ byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
if (bytes != null)
return swapEntry(unmarshalSwapEntry(bytes));
@@ -522,7 +522,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
assert key != null;
- byte[] bytes = swapMgr.read(spaceName, new SwapKey(key, part, keyBytes), cctx.deploy().globalLoader());
+ byte[] bytes = swapMgr.read(spaceName,
+ new SwapKey(key.value(cctx, false), part, keyBytes),
+ cctx.deploy().globalLoader());
if (bytes == null && lsnr != null)
return lsnr.entry;
@@ -606,7 +608,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridTuple<GridCacheSwapEntry> t = F.t1();
final GridTuple<IgniteCheckedException> err = F.t1();
- swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), new CI1<byte[]>() {
+ swapMgr.remove(spaceName, new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)), new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
if (rmv != null) {
try {
@@ -727,7 +729,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- return read(key, CU.marshal(cctx.shared(), key), part, false, readOffheap, readSwap);
+ return read(key, key.valueBytes(cctx), part, false, readOffheap, readSwap);
}
/**
@@ -801,8 +803,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (unprocessedKeys == null)
unprocessedKeys = new ArrayList<>(keys.size());
- unprocessedKeys.add(
- new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key)));
+ SwapKey swapKey = new SwapKey(key.value(cctx, false),
+ cctx.affinity().partition(key),
+ key.valueBytes(cctx));
+
+ unprocessedKeys.add(swapKey);
}
}
@@ -812,8 +817,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
else {
unprocessedKeys = new ArrayList<>(keys.size());
- for (KeyCacheObject key : keys)
- unprocessedKeys.add(new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key)));
+ for (KeyCacheObject key : keys) {
+ SwapKey swapKey = new SwapKey(key.value(cctx, false),
+ cctx.affinity().partition(key),
+ key.valueBytes(cctx));
+
+ unprocessedKeys.add(swapKey);
+ }
}
assert swapEnabled;
@@ -960,7 +970,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
// First try offheap.
if (offheapEnabled) {
- byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx));
+ byte[] val = offheap.remove(spaceName, part, key.value(cctx, false), key.valueBytes(cctx));
if (val != null) {
if (c != null)
@@ -970,9 +980,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
}
- if (swapEnabled)
- swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), c,
+ if (swapEnabled) {
+ swapMgr.remove(spaceName,
+ new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)),
+ c,
cctx.deploy().globalLoader());
+ }
}
/**
@@ -1060,8 +1073,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
else {
Map<SwapKey, byte[]> batch = new LinkedHashMap<>();
- for (GridCacheBatchSwapEntry entry : swapped)
- batch.put(new SwapKey(entry.key(), entry.partition(), entry.key().valueBytes(cctx)), entry.marshal());
+ for (GridCacheBatchSwapEntry entry : swapped) {
+ SwapKey swapKey = new SwapKey(entry.key().value(cctx, false),
+ entry.partition(),
+ entry.key().valueBytes(cctx));
+
+ batch.put(swapKey, entry.marshal());
+ }
swapMgr.writeAll(spaceName, batch, cctx.deploy().globalLoader());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index ad146fe..8ab6dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -41,6 +41,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
*/
public KeyCacheObjectImpl(Object val, byte[] valBytes) {
assert val != null;
+ assert valBytes != null || this instanceof UserKeyCacheObjectImpl : this;
this.val = val;
this.valBytes = valBytes;
@@ -61,8 +62,8 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
}
/** {@inheritDoc} */
- @Override public byte[] valueBytes(GridCacheContext ctx) {
- assert valBytes != null;
+ @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException {
+ assert valBytes != null : this;
return valBytes;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
index 5c635a6..0b7bc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
/**
* Cache object wrapping key provided by user. Need to be copied before stored in cache.
@@ -38,19 +39,29 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
}
/** {@inheritDoc} */
+ @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException {
+ if (valBytes == null)
+ valBytes = CU.marshal(ctx.shared(), val);
+
+ return valBytes;
+ }
+
+ /** {@inheritDoc} */
@Override public CacheObject prepareForCache(GridCacheContext ctx) {
- if (needCopy(ctx)) {
- try {
- if (valBytes == null)
- valBytes = ctx.marshaller().marshal(val);
+ try {
+ if (valBytes == null)
+ valBytes = ctx.marshaller().marshal(val);
- return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()), valBytes);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to marshal object: " + val, e);
+ if (needCopy(ctx)) {
+ Object val = ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader());
+
+ return new KeyCacheObjectImpl(val, valBytes);
}
- }
- else
+
return new KeyCacheObjectImpl(val, valBytes);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal object: " + val, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 497af0a..8dbabc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -74,7 +74,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> txLockAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
@@ -95,7 +95,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
IgniteTxLocalEx tx = ctx.tm().userTxx();
// Return value flag is true because we choose to bring values for explicit locks.
- return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter);
+ return lockAllAsync(ctx.cacheKeysView(keys),
+ timeout,
+ tx,
+ false,
+ false,
+ /*retval*/true,
+ null,
+ -1L,
+ filter);
}
/**
@@ -110,7 +118,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
* @param filter Optional filter.
* @return Future for locks.
*/
- protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 3dc9e45..5648ad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
@@ -87,9 +88,13 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridDirectTransient
private Map<IgniteTxKey, GridCacheVersion> dhtVers;
- /** Serialized map. */
- @GridToStringExclude
- private byte[] dhtVersBytes;
+ /** */
+ @GridDirectCollection(IgniteTxKey.class)
+ private Collection<IgniteTxKey> dhtVerKeys;
+
+ /** */
+ @GridDirectCollection(IgniteTxKey.class)
+ private Collection<GridCacheVersion> dhtVerVals;
/** Group lock key, if any. */
@GridToStringInclude
@@ -317,8 +322,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (grpLockKey != null && grpLockKeyBytes == null)
grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);
- if (dhtVers != null && dhtVersBytes == null)
- dhtVersBytes = ctx.marshaller().marshal(dhtVers);
+ if (dhtVers != null) {
+ for (IgniteTxKey key : dhtVers.keySet()) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
+ key.prepareMarshal(cctx);
+ }
+
+ dhtVerKeys = dhtVers.keySet();
+ dhtVerVals = dhtVers.values();
+ }
if (txNodes != null)
txNodesBytes = ctx.marshaller().marshal(txNodes);
@@ -349,8 +362,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (grpLockKeyBytes != null && grpLockKey == null)
grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
- if (dhtVersBytes != null && dhtVers == null)
- dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr);
+ if (dhtVerKeys != null && dhtVers == null) {
+ assert dhtVerVals != null;
+ assert dhtVerKeys.size() == dhtVerVals.size();
+
+ Iterator<IgniteTxKey> keyIt = dhtVerKeys.iterator();
+ Iterator<GridCacheVersion> verIt = dhtVerVals.iterator();
+
+ dhtVers = U.newHashMap(dhtVerKeys.size());
+
+ while (keyIt.hasNext()) {
+ IgniteTxKey key = keyIt.next();
+
+ key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+
+ dhtVers.put(key, verIt.next());
+ }
+ }
if (txNodesBytes != null)
txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
@@ -436,84 +464,90 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
writer.incrementState();
case 9:
- if (!writer.writeByteArray("dhtVersBytes", dhtVersBytes))
+ if (!writer.writeCollection("dhtVerKeys", dhtVerKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 10:
- if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes))
+ if (!writer.writeCollection("dhtVerVals", dhtVerVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+ if (!writer.writeBoolean("invalidate", invalidate))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("partLock", partLock))
+ if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
return false;
writer.incrementState();
case 15:
- if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeBoolean("partLock", partLock))
return false;
writer.incrementState();
case 16:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 17:
- if (!writer.writeLong("threadId", threadId))
+ if (!writer.writeBoolean("sys", sys))
return false;
writer.incrementState();
case 18:
- if (!writer.writeLong("timeout", timeout))
+ if (!writer.writeLong("threadId", threadId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
+ if (!writer.writeLong("timeout", timeout))
return false;
writer.incrementState();
case 20:
- if (!writer.writeInt("txSize", txSize))
+ if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("writeVer", writeVer))
+ if (!writer.writeInt("txSize", txSize))
return false;
writer.incrementState();
case 22:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
if (!writer.writeCollection("writesBytes", writesBytes, MessageCollectionItemType.BYTE_ARR))
return false;
@@ -548,7 +582,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 9:
- dhtVersBytes = reader.readByteArray("dhtVersBytes");
+ dhtVerKeys = reader.readCollection("dhtVerKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -556,7 +590,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 10:
- grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes");
+ dhtVerVals = reader.readCollection("dhtVerVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -564,7 +598,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 11:
- invalidate = reader.readBoolean("invalidate");
+ grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes");
if (!reader.isLastRead())
return false;
@@ -572,6 +606,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 12:
+ invalidate = reader.readBoolean("invalidate");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -583,7 +625,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 13:
+ case 14:
onePhaseCommit = reader.readBoolean("onePhaseCommit");
if (!reader.isLastRead())
@@ -591,7 +633,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 14:
+ case 15:
partLock = reader.readBoolean("partLock");
if (!reader.isLastRead())
@@ -599,7 +641,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 15:
+ case 16:
readsBytes = reader.readCollection("readsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -607,7 +649,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 16:
+ case 17:
sys = reader.readBoolean("sys");
if (!reader.isLastRead())
@@ -615,7 +657,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 17:
+ case 18:
threadId = reader.readLong("threadId");
if (!reader.isLastRead())
@@ -623,7 +665,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 18:
+ case 19:
timeout = reader.readLong("timeout");
if (!reader.isLastRead())
@@ -631,7 +673,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 19:
+ case 20:
txNodesBytes = reader.readByteArray("txNodesBytes");
if (!reader.isLastRead())
@@ -639,7 +681,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 20:
+ case 21:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -647,7 +689,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 21:
+ case 22:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -655,7 +697,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 22:
+ case 23:
writesBytes = reader.readCollection("writesBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -675,7 +717,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 24;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c1000ea..3ce5cd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -500,12 +500,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
txEntry.cached().unswap(true, false);
- GridTuple3<GridCacheOperation, CacheObject, byte[]> res =
+ IgniteBiTuple<GridCacheOperation, CacheObject> res =
applyTransformClosures(txEntry, false);
GridCacheOperation op = res.get1();
CacheObject val = res.get2();
- byte[] valBytes = res.get3();
GridCacheVersion explicitVer = txEntry.conflictVersion();
@@ -556,7 +555,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer, CU.subjectId(this, cctx),
resolveTaskName());
else {
- cached.innerSet(this, eventNodeId(), nodeId, val, valBytes, false, false,
+ cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
txEntry.ttl(), true, true, topVer, txEntry.filters(),
replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
near() ? null : explicitVer, CU.subjectId(this, cctx),
@@ -565,7 +564,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Keep near entry up to date.
if (nearCached != null) {
CacheObject val0 = null;
- byte[] valBytes0 = null;
GridCacheValueBytes valBytesTuple = cached.valueBytes();
@@ -579,8 +577,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
else
val0 = cached.rawGet();
- nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
- cached.ttl(), nodeId);
+ nearCached.updateOrEvict(xidVer,
+ val0,
+ cached.expireTime(),
+ cached.ttl(),
+ nodeId);
}
}
}
@@ -591,7 +592,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Keep near entry up to date.
if (nearCached != null)
- nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId);
+ nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId);
}
else if (op == RELOAD) {
CacheObject reloaded = cached.innerReload();
@@ -599,7 +600,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (nearCached != null) {
nearCached.innerReload();
- nearCached.updateOrEvict(cached.version(), reloaded, null,
+ nearCached.updateOrEvict(cached.version(), reloaded,
cached.expireTime(), cached.ttl(), nodeId);
}
}
@@ -621,7 +622,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (nearCached != null) {
CacheObject val0 = null;
- byte[] valBytes0 = null;
GridCacheValueBytes valBytesTuple = cached.valueBytes();
@@ -635,8 +635,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
else
val0 = cached.rawGet();
- nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
- cached.ttl(), nodeId);
+ nearCached.updateOrEvict(xidVer,
+ val0,
+ cached.expireTime(),
+ cached.ttl(),
+ nodeId);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 298e522..5febfcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -458,7 +458,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
entry = entryEx(key, false);
- entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
+ entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
replicate ? DR_LOAD : DR_NONE);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 09f1629..6e4eac8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -311,22 +311,18 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(-1) || deletedUnlocked())
return null;
else {
- CacheObject val0 = null;
- byte[] valBytes0 = null;
-
-// TODO IGNITE-51.
-// GridCacheValueBytes valBytesTuple = valueBytesUnlocked();
-//
-// if (!valBytesTuple.isNull()) {
-// if (valBytesTuple.isPlain())
-// val0 = (V)valBytesTuple.get();
-// else
-// valBytes0 = valBytesTuple.get();
-// }
-// else
-// val0 = val;
-
- return F.t(ver, val0, valBytes0);
+ CacheObject val0 = val;
+
+ if (val0 == null && valPtr != 0) {
+ IgniteBiTuple<byte[], Boolean> t = valueBytes0();
+
+ if (t.get2())
+ val0 = cctx.toCacheObject(t.get1(), null);
+ else
+ val0 = cctx.toCacheObject(null, t.get1());
+ }
+
+ return F.t(ver, val0, null);
}
}
@@ -563,7 +559,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
clearIndex(prev);
// Give to GC.
- update(null, null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver);
if (swap) {
releaseSwap();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 516b2bb..e9674c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -357,14 +357,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
expiryPlc,
skipVals);
}
-// TODO IGNITE-51.
-// else {
-// fut = tx.getAllAsync(cctx,
-// keys.keySet(),
-// null,
-// deserializePortable,
-// skipVals);
-// }
+ else {
+ fut = tx.getAllAsync(cctx,
+ keys.keySet(),
+ null,
+ false,
+ skipVals,
+ true);
+ }
}
}
else {
@@ -394,13 +394,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
expiryPlc, skipVals);
}
else {
- return null;
-// TODO IGNITE-51.
-// return tx.getAllAsync(cctx,
-// keys.keySet(),
-// null,
-// false,
-// skipVals);
+ return tx.getAllAsync(cctx,
+ keys.keySet(),
+ null,
+ false,
+ skipVals,
+ true);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 126d641..056c3ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1075,7 +1075,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
try {
GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
- if (entry.initialValue(info.value(), null, info.version(), info.ttl(),
+ if (entry.initialValue(info.value(), info.version(), info.ttl(),
info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 7c7e42c..449702b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -548,7 +548,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(
- @Nullable Collection<? extends K> keys,
+ @Nullable Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx txx,
boolean isInvalidate,
@@ -583,7 +583,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param filter Optional filter.
* @return Lock future.
*/
- public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<? extends K> keys,
+ public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx txx,
boolean isInvalidate,
@@ -612,16 +612,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
accessTtl,
filter);
- for (K key : keys) {
- if (key == null)
- continue;
-
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
+ for (KeyCacheObject key : keys) {
try {
while (true) {
- GridDhtCacheEntry entry = entryExx(cacheKey, tx.topologyVersion());
+ GridDhtCacheEntry entry = entryExx(key, tx.topologyVersion());
try {
fut.addEntry(entry);
[4/6] incubator-ignite git commit: # ignite-51
Posted by sb...@apache.org.
# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea5bd46a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea5bd46a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea5bd46a
Branch: refs/heads/ignite-51
Commit: ea5bd46aebbe67357b2a20f58027d53a8323c17e
Parents: a040311 29508fa
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:07:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:25:32 2015 +0300
----------------------------------------------------------------------
.../distributed/near/GridNearAtomicCache.java | 5 +-
.../distributed/near/GridNearCacheAdapter.java | 5 +-
.../distributed/near/GridNearCacheEntry.java | 46 +++--
.../distributed/near/GridNearGetFuture.java | 27 +--
.../near/GridNearTransactionalCache.java | 7 +-
.../communication/GridCacheMessageSelfTest.java | 172 +++++++++++++++++++
6 files changed, 205 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 4ebcced,a30f372..2358013
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@@ -367,12 -369,12 +367,15 @@@ public class GridNearAtomicCache<K, V>
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
++ if (keyCheck)
++ validateCacheKeys(keys);
++
GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
subjId = ctx.subjectIdPerCall(subjId, prj);
return loadAsync(null,
-- keys,
++ ctx.cacheKeysView(keys),
false,
forcePrimary,
subjId,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b8f8c6f,a32b6a8..e34019b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@@ -275,7 -275,7 +275,7 @@@ public abstract class GridNearCacheAdap
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx,
-- @Nullable Collection<? extends K> keys,
++ @Nullable Collection<KeyCacheObject> keys,
boolean reload,
boolean forcePrimary,
@Nullable UUID subjId,
@@@ -287,9 -287,9 +287,6 @@@
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
-- if (keyCheck)
-- validateCacheKeys(keys);
--
IgniteTxLocalEx txx = (tx != null && tx.local()) ? (IgniteTxLocalEx)tx : null;
final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2d859a0,2f6bebb..911f77b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@@ -273,22 -280,22 +273,18 @@@ public class GridNearCacheEntry extend
if (dhtVer == null)
return null;
else {
-- CacheObject val0 = null;
-- byte[] valBytes0 = null;
-
- // TODO IGNITE-51.
- // GridCacheValueBytes valBytesTuple = valueBytes();
- //
- // if (!valBytesTuple.isNull()) {
- // if (valBytesTuple.isPlain())
- // val0 = (V)valBytesTuple.get();
- // else
- // valBytes0 = valBytesTuple.get();
- // }
- // else
- // val0 = val;
-
- return F.t(dhtVer, val0, valBytes0);
++ CacheObject val0 = val;
+
-// TODO IGNITE-51.
-// GridCacheValueBytes valBytesTuple = valueBytes();
-//
-// if (!valBytesTuple.isNull()) {
-// if (valBytesTuple.isPlain())
-// val0 = (V)valBytesTuple.get();
-// else
-// valBytes0 = valBytesTuple.get();
-// }
-// else
-// val0 = val;
-
- return F.t(dhtVer, val0, valBytes0);
++ if (val0 == null && valPtr != 0) {
++ IgniteBiTuple<byte[], Boolean> t = valueBytes0();
++
++ if (t.get2())
++ val0 = cctx.toCacheObject(t.get1(), null);
++ else
++ val0 = cctx.toCacheObject(null, t.get1());
++ }
++
++ return F.t(ver, val0, null);
}
}
@@@ -320,17 -327,17 +316,15 @@@
/** {@inheritDoc} */
@Override protected Object readThrough(IgniteInternalTx tx, KeyCacheObject key, boolean reload,
UUID subjId, String taskName) throws IgniteCheckedException {
-- return null;
--// TODO IGNTIE-51.
--// return cctx.near().loadAsync(tx,
--// F.asList(key),
--// reload,
--// /*force primary*/false,
--// subjId,
--// taskName,
--// true,
--// null,
--// false).get().get(key);
++ return cctx.near().loadAsync(tx,
++ F.asList(key),
++ reload,
++ /*force primary*/false,
++ subjId,
++ taskName,
++ true,
++ null,
++ false).get().get(key.value(cctx, false));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ef76366,ef76366..31481f2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -63,7 -63,7 +63,7 @@@ public final class GridNearGetFuture<K
private GridCacheContext<K, V> cctx;
/** Keys. */
-- private Collection<? extends K> keys;
++ private Collection<KeyCacheObject> keys;
/** Reload flag. */
private boolean reload;
@@@ -130,7 -130,7 +130,7 @@@
*/
public GridNearGetFuture(
GridCacheContext<K, V> cctx,
-- Collection<? extends K> keys,
++ Collection<KeyCacheObject> keys,
boolean readThrough,
boolean reload,
boolean forcePrimary,
@@@ -170,21 -170,21 +170,7 @@@
public void init() {
long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
-- Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
-- @Override public KeyCacheObject apply(K key) {
-- if (key == null) {
-- NullPointerException err = new NullPointerException("Null key.");
--
-- onDone(err);
--
-- throw err;
-- }
--
-- return cctx.toCacheKeyObject(key);
-- }
-- });
--
-- map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
++ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
markInitialized();
}
@@@ -199,13 -199,13 +185,6 @@@
// Should not flip trackable flag from true to false since get future can be remapped.
}
-- /**
-- * @return Keys.
-- */
-- Collection<? extends K> keys() {
-- return keys;
-- }
--
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e35e0fd,6e88c4f..7c1243e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -111,6 -111,6 +111,9 @@@ public class GridNearTransactionalCache
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
++ if (keyCheck)
++ validateCacheKeys(keys);
++
IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
if (tx != null && !tx.implicit() && !skipTx) {
@@@ -131,7 -126,7 +134,7 @@@
subjId = ctx.subjectIdPerCall(subjId, prj);
return loadAsync(null,
-- keys,
++ ctx.cacheKeysView(keys),
false,
forcePrimary,
subjId,
@@@ -150,7 -145,7 +153,7 @@@
* @return Future.
*/
IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
-- @Nullable Collection<? extends K> keys,
++ @Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
[2/6] incubator-ignite git commit: # ignite-51
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 45b13b7..d2b7d36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -464,24 +464,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
entry.cached(cached, null);
-// TODO IGNITE-51.
-// while (true) {
-// GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
-//
-// try {
-// // Set key bytes to avoid serializing in future.
-// cached.keyBytes(entry.keyBytes());
-//
-// entry.cached(cached, null);
-//
-// break;
-// }
-// catch (GridCacheEntryRemovedException ignore) {
-// if (log.isDebugEnabled())
-// log.debug("Got removed entry when adding to dht tx (will retry): " + cached);
-// }
-// }
-
GridCacheVersion explicit = entry.explicitVersion();
if (explicit != null) {
@@ -641,17 +623,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
- IgniteInternalFuture<Boolean> fut = null;
-// TODO IGNTIE-51
-// IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-// lockTimeout(),
-// this,
-// isInvalidate(),
-// read,
-// /*retval*/false,
-// isolation,
-// accessTtl,
-// CU.empty());
+ IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
+ lockTimeout(),
+ this,
+ isInvalidate(),
+ read,
+ /*retval*/false,
+ isolation,
+ accessTtl,
+ (IgnitePredicate[])CU.empty());
return new GridEmbeddedFuture<>(
fut,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 369935b..7f9022a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -340,7 +340,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
if (err != null || procRes != null)
- ret.addEntryProcessResult(key,
+ ret.addEntryProcessResult(key.value(cacheCtx, false),
err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err));
else
ret.invokeResult(true);
@@ -1240,7 +1240,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
try {
- if (entry.initialValue(info.value(), null, info.version(),
+ if (entry.initialValue(info.value(), info.version(),
info.ttl(), info.expireTime(), true, topVer, drType)) {
if (rec && !entry.isInternal())
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 2423fe1..ec45af1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -328,73 +328,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
+ case 24:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeByteArray("ownedBytes", ownedBytes))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 34:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 34:
+ case 35:
if (!writer.writeLong("topVer", topVer))
return false;
@@ -416,7 +416,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
+ case 24:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -424,7 +424,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -432,7 +432,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -440,7 +440,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -448,7 +448,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -456,7 +456,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -464,7 +464,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -472,7 +472,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
ownedBytes = reader.readByteArray("ownedBytes");
if (!reader.isLastRead())
@@ -480,7 +480,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -488,7 +488,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -496,7 +496,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 34:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -504,7 +504,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 35:
topVer = reader.readLong("topVer");
if (!reader.isLastRead())
@@ -524,6 +524,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 35;
+ return 36;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 96777c9..603141b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -60,7 +60,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
private GridCacheContext<K, V> cctx;
/** Keys. */
- private Collection<? extends K> keys;
+ private Collection<KeyCacheObject> keys;
/** Topology version. */
private long topVer;
@@ -127,7 +127,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
*/
public GridPartitionedGetFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long topVer,
boolean readThrough,
boolean reload,
@@ -167,21 +167,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
public void init() {
long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
- Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
- @Override public KeyCacheObject apply(K key) {
- if (key == null) {
- NullPointerException err = new NullPointerException("Null key.");
-
- onDone(err);
-
- throw err;
- }
-
- return cctx.toCacheKeyObject(key);
- }
- });
-
- map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
markInitialized();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5e38db3..adcaebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -276,6 +276,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean deserializePortable,
final boolean skipVals
) {
+ ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
subjId = ctx.subjectIdPerCall(null, prj);
@@ -286,7 +294,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
@Override public IgniteInternalFuture<Map<K, V>> apply() {
- return getAllAsync0(keys,
+ return getAllAsync0(ctx.cacheKeysView(keys),
false,
forcePrimary,
subjId0,
@@ -624,7 +632,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -895,7 +903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param skipVals Skip values flag.
* @return Get future.
*/
- private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
+ private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
boolean reload,
boolean forcePrimary,
UUID subjId,
@@ -903,14 +911,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals) {
- ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
- if (F.isEmpty(keys))
- return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
-
- if (keyCheck)
- validateCacheKeys(keys);
-
long topVer = ctx.affinity().affinityTopologyVersion();
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
@@ -922,17 +922,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean success = true;
// Optimistically expect that all keys are available locally (avoid creation of get future).
- for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
+ for (KeyCacheObject key : keys) {
GridCacheEntryEx entry = null;
while (true) {
try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
// If our DHT cache do has value, then we peek it.
if (entry != null) {
@@ -956,12 +951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion obsoleteVer = context().versions().next();
if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeIfObsolete(cacheKey);
+ removeIfObsolete(key);
success = false;
}
else
- ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, false);
+ ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, false);
}
else
success = false;
@@ -998,7 +993,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (success) {
sendTtlUpdateRequest(expiry);
- return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+ return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
}
}
@@ -1020,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/**
@@ -1700,7 +1695,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
locNodeId,
op,
writeVal,
- null,
req.invokeArguments(),
primary && writeThrough(),
req.returnValue(),
@@ -1965,7 +1959,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
null,
- null,
false,
false,
expiry,
@@ -2460,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
nodeId,
op,
op == TRANSFORM ? entryProcessor : val,
- null,
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*retval*/false,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e2b974c..a9a26c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
GridCacheReturn ret = (GridCacheReturn)res;
- if (op != TRANSFORM) {
+ if (op != TRANSFORM && ret != null) {
CacheObject val = (CacheObject)ret.value();
ret.value(CU.value(val, cctx, false));
@@ -357,7 +357,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (res.remapKeys() != null) {
assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
- // TODO IGNITE-51.
mapOnTopology(res.remapKeys(), true, nodeId);
return;
@@ -809,7 +808,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
- onDone(new GridCacheReturn<Object>(null, true));
+ onDone(new GridCacheReturn(null, true));
}
catch (IgniteCheckedException e) {
onDone(addFailedKeys(req.keys(), e));
@@ -928,8 +927,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (err0 == null)
err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- // TODO IGNITE-51.
- err0.add(failedKeys, err);
+ List<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(key.value(cctx, false));
+
+ err0.add(keys, err);
return err0;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 84b32ce..a22b31c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -183,12 +183,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
+ if (keyCheck)
+ validateCacheKeys(keys);
+
IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
- return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+ return tx.getAllAsync(ctx,
+ ctx.cacheKeysView(keys),
+ entry,
+ deserializePortable,
+ skipVals,
+ false);
}
});
}
@@ -200,7 +208,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
subjId = ctx.subjectIdPerCall(subjId, prj);
return loadAsync(
- keys,
+ ctx.cacheKeysView(keys),
true,
false,
forcePrimary,
@@ -232,9 +240,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param taskName Task name.
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
+ * @param skipVals Skip values flag.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
+ public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean reload,
boolean forcePrimary,
@@ -248,9 +257,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
- if (keyCheck)
- validateCacheKeys(keys);
-
if (expiryPlc == null)
expiryPlc = expiryPolicy(null);
@@ -261,17 +267,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean success = true;
// Optimistically expect that all keys are available locally (avoid creation of get future).
- for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
+ for (KeyCacheObject key : keys) {
GridCacheEntryEx entry = null;
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
while (true) {
try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
// If our DHT cache do has value, then we peek it.
if (entry != null) {
@@ -295,12 +296,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
GridCacheVersion obsoleteVer = context().versions().next();
if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeIfObsolete(cacheKey);
+ removeIfObsolete(key);
success = false;
}
else
- ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, true);
+ ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
}
else
success = false;
@@ -337,7 +338,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (success) {
sendTtlUpdateRequest(expiryPlc);
- return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+ return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
}
}
@@ -360,7 +361,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/**
@@ -369,7 +370,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* {@inheritDoc}
*/
@Override public IgniteInternalFuture<Boolean> lockAllAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -426,7 +427,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
Collection<KeyCacheObject> locKeys = new ArrayList<>();
for (K key : keys) {
- // TODO IGNITE-51.
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
GridDistributedCacheEntry entry = peekExx(cacheKey);
@@ -519,7 +519,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param ver Lock version.
* @param keys Keys.
*/
- public void removeLocks(long threadId, GridCacheVersion ver, Collection<? extends K> keys) {
+ public void removeLocks(long threadId, GridCacheVersion ver, Collection<KeyCacheObject> keys) {
if (keys.isEmpty())
return;
@@ -530,11 +530,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
Collection<KeyCacheObject> locKeys = new LinkedList<>();
- for (K key : keys) {
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
- GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver);
+ for (KeyCacheObject key : keys) {
+ GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
if (lock != null) {
long topVer = lock.topologyVersion();
@@ -559,14 +556,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
req.version(ver);
}
- GridCacheEntryEx entry = peekEx(cacheKey);
+ GridCacheEntryEx entry = peekEx(key);
- KeyCacheObject key0 = entry != null ? entry.key() : cacheKey;
+ KeyCacheObject key0 = entry != null ? entry.key() : key;
req.addKey(key0, ctx);
}
else
- locKeys.add(cacheKey);
+ locKeys.add(key);
}
}
@@ -616,7 +613,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
final long threadId,
final GridCacheVersion ver,
final long topVer,
- final Collection<K> keys,
+ final Collection<KeyCacheObject> keys,
final boolean txRead,
final long timeout,
final long accessTtl,
@@ -624,9 +621,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
) {
assert keys != null;
- // TODO IGNITE-51.
- // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
- IgniteInternalFuture<Object> keyFut = null;
+ IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
// Prevent embedded future creation if possible.
if (keyFut.isDone()) {
@@ -691,7 +686,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
long threadId,
final GridCacheVersion ver,
final long topVer,
- final Collection<K> keys,
+ final Collection<KeyCacheObject> keys,
final boolean txRead,
final long timeout,
final long accessTtl,
@@ -717,15 +712,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean timedout = false;
- for (K key : keys) {
+ for (KeyCacheObject key : keys) {
if (timedout)
break;
- // TODO IGNITE-51.
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
while (true) {
- GridDhtCacheEntry entry = entryExx(cacheKey, topVer);
+ GridDhtCacheEntry entry = entryExx(key, topVer);
try {
fut.addEntry(key == null ? null : entry);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7d007a1..29e9730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
private long threadId;
/** Keys to lock. */
- private Collection<? extends K> keys;
+ private Collection<KeyCacheObject> keys;
/** Future ID. */
private IgniteUuid futId;
@@ -133,7 +133,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
*/
public GridDhtColocatedLockFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
boolean retval,
@@ -535,8 +535,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Continue mapping on the same topology version as it was before.
topSnapshot.compareAndSet(null, snapshot);
- // TODO IGNITE-51.
- // map(keys);
+ map(keys);
markInitialized();
@@ -569,8 +568,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
topSnapshot.compareAndSet(null, snapshot);
- // TODO IGNITE-51.
- // map(keys);
+ map(keys);
markInitialized();
}
@@ -881,9 +879,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
threadId,
lockVer,
topVer,
- // TODO IGNITE-51.
- // keys,
- null,
+ keys,
read,
timeout,
accessTtl,
@@ -1244,7 +1240,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
// Set value to detached entry.
- entry.resetFromPrimary(newVal, null, dhtVer);
+ entry.resetFromPrimary(newVal, dhtVer);
if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 944034c..b9602d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,16 +46,12 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
* Sets value to detached entry so it can be retrieved in transactional gets.
*
* @param val Value.
- * @param valBytes Value bytes.
* @param ver Version.
* @throws IgniteCheckedException If value unmarshalling failed.
*/
- public void resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver)
+ public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException {
- if (valBytes != null && val == null)
- val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
-
- value(val, valBytes);
+ value(val);
this.ver = ver;
}
@@ -66,7 +62,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) {
+ @Override protected void value(@Nullable CacheObject val) {
this.val = val;
}
@@ -79,7 +75,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+ @Override protected void updateIndex(CacheObject val, long expireTime,
GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
// No-op for detached entries, index is updated on primary nodes.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 66efe48..1338788 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -503,7 +503,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
try {
if (entry.initialValue(
info.value(),
- null,
info.version(),
info.ttl(),
info.expireTime(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 6ebd8d2..4e5beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -506,7 +506,6 @@ public class GridDhtPartitionDemandPool<K, V> {
if (preloadPred == null || preloadPred.apply(entry)) {
if (cached.initialValue(
entry.value(),
- null,
entry.version(),
entry.ttl(),
entry.expireTime(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a30f372..4ebcced 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -219,7 +219,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
nodeId,
op,
val,
- valBytes,
null,
/*write-through*/false,
/*retval*/false,
@@ -316,7 +315,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
nodeId,
op,
op == TRANSFORM ? entryProcessor : val,
- null,
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*retval*/false,
@@ -650,7 +648,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -659,7 +657,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@Nullable TransactionIsolation isolation,
long accessTtl,
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- return dht.lockAllAsync(keys, timeout, filter);
+ return dht.lockAllAsync(null, timeout, filter);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index a32b6a8..b8f8c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -309,7 +309,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
// init() will register future for responses if future has remote mappings.
fut.init();
- return ctx.wrapCloneMap(fut);
+ return fut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2f6bebb..2d859a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -138,7 +138,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(topVer)) {
// Version does not change for load ops.
- update(e.value(), null, e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+ update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
if (cctx.deferredDelete()) {
boolean deleted = val == null;
@@ -176,7 +176,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* This method should be called only when lock is owned on this entry.
*
* @param val Value.
- * @param valBytes Value bytes.
* @param ver Version.
* @param dhtVer DHT version.
* @param primaryNodeId Primary node ID.
@@ -185,27 +184,23 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings( {"RedundantTypeArguments"})
- public boolean resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer,
- UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException {
+ public boolean resetFromPrimary(CacheObject val,
+ GridCacheVersion ver,
+ GridCacheVersion dhtVer,
+ UUID primaryNodeId)
+ throws GridCacheEntryRemovedException, IgniteCheckedException
+ {
assert dhtVer != null;
cctx.versions().onReceived(primaryNodeId, dhtVer);
-// TODO IGNITE-51.
-// if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) {
-// GridCacheVersion curDhtVer = dhtVersion();
-//
-// if (!F.eq(dhtVer, curDhtVer))
-// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader());
-// }
-
synchronized (this) {
checkObsolete();
this.primaryNodeId = primaryNodeId;
if (!F.eq(this.dhtVer, dhtVer)) {
- value(val, valBytes);
+ value(val);
this.ver = ver;
this.dhtVer = dhtVer;
@@ -222,14 +217,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
*
* @param dhtVer DHT version.
* @param val Value associated with version.
- * @param valBytes Value bytes.
* @param expireTime Expire time.
* @param ttl Time to live.
* @param primaryNodeId Primary node ID.
*/
public void updateOrEvict(GridCacheVersion dhtVer,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
long expireTime,
long ttl,
UUID primaryNodeId)
@@ -248,7 +241,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// If cannot evict, then update.
if (this.dhtVer == null) {
if (!markObsolete(dhtVer)) {
- value(val, valBytes);
+ value(val);
ttlAndExpireTimeExtras((int) ttl, expireTime);
@@ -396,7 +389,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// Change entry only if dht version has changed.
if (!dhtVer.equals(dhtVersion())) {
- update(val, valBytes, expireTime, ttl, ver);
+ update(val, expireTime, ttl, ver);
if (cctx.deferredDelete()) {
boolean deleted = val == null && valBytes == null;
@@ -429,7 +422,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+ @Override protected void updateIndex(CacheObject val, long expireTime,
GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
// No-op: queries are disabled for near cache.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index b880966..d89b59d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1029,7 +1029,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
res.rolledbackVersions(), res.pending());
@@ -1387,7 +1387,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 6e88c4f..e35e0fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -116,7 +116,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
- return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+ return tx.getAllAsync(ctx,
+ ctx.cacheKeysView(keys),
+ entry,
+ deserializePortable,
+ skipVals,
+ false);
}
});
}
@@ -400,7 +405,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> lockAllAsync(
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isInvalidate,
@@ -411,9 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
IgnitePredicate<Cache.Entry<K, V>>[] filter
) {
GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
- // TODO IGNITE-51
- // keys,
- null,
+ keys,
(GridNearTxLocal)tx,
isRead,
retval,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index d899562..8d6800d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -280,7 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> loadMissing(
- GridCacheContext cacheCtx,
+ final GridCacheContext cacheCtx,
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -288,69 +288,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
boolean skipVals,
final IgniteBiInClosure<KeyCacheObject, Object> c
) {
- return null;
-// TODO IGNITE-51.
-// if (cacheCtx.isNear()) {
-// return cacheCtx.nearTx().txLoadAsync(this,
-// keys,
-// readThrough,
-// deserializePortable,
-// accessPolicy(cacheCtx, keys),
-// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-// try {
-// Map<K, V> map = f.get();
-//
-// // Must loop through keys, not map entries,
-// // as map entries may not have all the keys.
-// for (K key : keys)
-// c.apply(key, map.get(key));
-//
-// return true;
-// }
-// catch (Exception e) {
-// setRollbackOnly();
-//
-// throw new GridClosureException(e);
-// }
-// }
-// });
-// }
-// else if (cacheCtx.isColocated()) {
-// return cacheCtx.colocated().loadAsync(keys,
-// readThrough,
-// /*reload*/false,
-// /*force primary*/false,
-// topologyVersion(),
-// CU.subjectId(this, cctx),
-// resolveTaskName(),
-// deserializePortable,
-// accessPolicy(cacheCtx, keys),
-// skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-// @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-// try {
-// Map<K, V> map = f.get();
-//
-// // Must loop through keys, not map entries,
-// // as map entries may not have all the keys.
-// for (K key : keys)
-// c.apply(key, map.get(key));
-//
-// return true;
-// }
-// catch (Exception e) {
-// setRollbackOnly();
-//
-// throw new GridClosureException(e);
-// }
-// }
-// });
-// }
-// else {
-// assert cacheCtx.isLocal();
-//
-// return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
-// }
+ if (cacheCtx.isNear()) {
+ return cacheCtx.nearTx().txLoadAsync(this,
+ keys,
+ readThrough,
+ deserializePortable,
+ accessPolicy(cacheCtx, keys),
+ skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ // Must loop through keys, not map entries,
+ // as map entries may not have all the keys.
+ for (KeyCacheObject key : keys)
+ c.apply(key, map.get(key.value(cacheCtx, false)));
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+ }
+ });
+ }
+ else if (cacheCtx.isColocated()) {
+ return cacheCtx.colocated().loadAsync(keys,
+ readThrough,
+ /*reload*/false,
+ /*force primary*/false,
+ topologyVersion(),
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ deserializePortable,
+ accessPolicy(cacheCtx, keys),
+ skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ // Must loop through keys, not map entries,
+ // as map entries may not have all the keys.
+ for (KeyCacheObject key : keys)
+ c.apply(key, map.get(key.value(cacheCtx, false)));
+
+ return true;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+ }
+ });
+ }
+ else {
+ assert cacheCtx.isLocal();
+
+ return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 9af91c6..491a171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -949,7 +949,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
- nearEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion(),
+ nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(),
tup.get1(), m.node().id());
}
else if (txEntry.cached().detached()) {
@@ -957,7 +957,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
- detachedEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion());
+ detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion());
}
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 42e8600..e4111ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -257,61 +257,61 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
+ case 24:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeBoolean("implicitSingle", implicitSingle))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeBoolean("near", near))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeLong("topVer", topVer))
return false;
@@ -333,7 +333,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
+ case 24:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -341,7 +341,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
implicitSingle = reader.readBoolean("implicitSingle");
if (!reader.isLastRead())
@@ -349,7 +349,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -357,7 +357,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
@@ -365,7 +365,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -373,7 +373,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
near = reader.readBoolean("near");
if (!reader.isLastRead())
@@ -381,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
@@ -389,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -397,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -405,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
topVer = reader.readLong("topVer");
if (!reader.isLastRead())
@@ -425,7 +425,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 33;
+ return 34;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 273a184..78cfb73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -224,14 +224,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (ownedVals != null && ownedValsBytes == null) {
ownedValsBytes = new ArrayList<>(ownedVals.size());
for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> entry : ownedVals.entrySet()) {
IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
+ GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId());
+
+ entry.getKey().prepareMarshal(cctx);
+
CacheObject val = tup.get2();
if (val != null)
@@ -245,8 +247,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
retValBytes = ctx.marshaller().marshal(retVal);
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys)
+ for (IgniteTxKey key :filterFailedKeys) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.prepareMarshal(cctx);
+ }
}
}
@@ -254,8 +259,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (ownedValsBytes != null && ownedVals == null) {
ownedVals = new HashMap<>();
@@ -264,6 +267,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
CacheObject val = tup.get3();
+ GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId());
+
+ tup.get1().finishUnmarshal(cctx, ldr);
+
if (val != null)
val.finishUnmarshal(cctx, ldr);
@@ -275,8 +282,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys)
+ for (IgniteTxKey key :filterFailedKeys) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.finishUnmarshal(cctx, ldr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 7a0c5d8..4c59437 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -104,7 +104,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+ @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
@@ -121,7 +121,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
IgniteTxLocalEx tx = ctx.tm().localTx();
- return lockAllAsync(keys, timeout, tx, filter);
+ return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter);
}
/**
@@ -131,7 +131,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
* @param filter Filter.
* @return Future.
*/
- public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
IgnitePredicate<Cache.Entry<K, V>>[] filter) {
@@ -141,12 +141,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
try {
- for (K key : keys) {
+ for (KeyCacheObject key : keys) {
while (true) {
GridLocalCacheEntry entry = null;
try {
- entry = entryExx(ctx.toCacheKeyObject(key));
+ entry = entryExx(key);
if (!ctx.isAll(entry, filter)) {
fut.onFailed();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index b2797a4..4e769e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -107,7 +107,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
*/
GridLocalLockFuture(
GridCacheContext<K, V> cctx,
- Collection<? extends K> keys,
+ Collection<KeyCacheObject> keys,
IgniteTxLocalEx tx,
GridLocalCache<K, V> cache,
long timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 833f8c9..8aa6a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1014,7 +1014,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
try {
entry = entryEx(cacheKey);
- GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+ GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
val == null ? DELETE : op,
val,
@@ -1452,7 +1452,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+ GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
op,
writeVal,
@@ -1468,12 +1468,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
taskName);
if (intercept) {
- if (op == UPDATE)
+ if (op == UPDATE) {
ctx.config().getInterceptor().onAfterPut(entry.key().value(ctx, false),
- writeVal.value(ctx, false));
+ writeVal.value(ctx, false));
+ }
else
- ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false),
- CU.value(t.get2(), ctx, false));
+ ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false), t.get2());
}
}
catch (GridCacheEntryRemovedException ignore) {
@@ -1555,7 +1555,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+ @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index c26ca4b..563d38c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1193,7 +1193,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
- protected GridTuple3<GridCacheOperation, CacheObject, byte[]> applyTransformClosures(
+ protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
IgniteTxEntry txEntry,
boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
GridCacheContext cacheCtx = txEntry.context();
@@ -1201,10 +1201,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
assert cacheCtx != null;
if (isSystemInvalidate())
- return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null, null);
+ return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
if (F.isEmpty(txEntry.entryProcessors()))
- return F.t(txEntry.op(), txEntry.value(), null);
+ return F.t(txEntry.op(), txEntry.value());
else {
try {
boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
@@ -1246,14 +1246,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
modified |= invokeEntry.modified();
}
- if (modified) {
- cacheVal = cacheCtx.toCacheObject(val);
-// TODO IGNITE-51
-// val = (V)cacheCtx.<V>unwrapTemporary(val);
-//
-// if (cacheCtx.portableEnabled())
-// val = (V)cacheCtx.marshalToPortable(val);
- }
+ if (modified)
+ cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
@@ -1270,7 +1264,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
}
- return F.t(op, cacheVal, null);
+ return F.t(op, cacheVal);
}
catch (GridCacheFilterFailedException e) {
assert false : "Empty filter failed for innerGet: " + e;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6b1881d..66eea11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -1044,7 +1044,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException {
hasWriteVal = in.readBoolean();
- val = (CacheObject)in.readObject();
+ if (hasWriteVal)
+ val = (CacheObject)in.readObject();
op = fromOrdinal(in.readInt());
// TODO IGNITE-51.
[6/6] incubator-ignite git commit: # ignite-51 minor
Posted by sb...@apache.org.
# ignite-51 minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7f9a6630
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7f9a6630
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7f9a6630
Branch: refs/heads/ignite-51
Commit: 7f9a6630b9480c580b4312a44e20eeac866fe4a0
Parents: 810be9a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:29:23 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:29:23 2015 +0300
----------------------------------------------------------------------
.../spi/communication/GridCacheMessageSelfTest.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7f9a6630/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index c011bbc..226e35b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -101,16 +101,16 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
}
});
- TestMessage message = new TestMessage();
+ TestMessage msg = new TestMessage();
for (int i = 1; i <= SAMPLE_CNT; i++) {
- mgr0.send(grid(1).localNode(), topic, message, GridIoPolicy.PUBLIC_POOL);
+ mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
TestMessage1 mes1 = new TestMessage1();
mes1.init(new GridTestMessage(grid(1).localNode().id(), i, 0));
- message.add(mes1);
+ msg.add(mes1);
}
assert latch.await(3, SECONDS);
@@ -180,7 +180,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ *
+ */
private static class TestMessage1 extends GridCacheMessage {
+ /** */
GridTestMessage mes;
public void init(GridTestMessage mes) {
@@ -189,7 +193,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public byte directType() {
- return 0;
+ return DIRECT_TYPE1;
}
/** {@inheritDoc} */
[5/6] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-51' into ignite-51
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-51' into ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/810be9a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/810be9a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/810be9a2
Branch: refs/heads/ignite-51
Commit: 810be9a2fc62f0cc7837ed3d7b8a55b71fc66981
Parents: ea5bd46 2f43d32
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:25:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:25:49 2015 +0300
----------------------------------------------------------------------
.../communication/GridCacheMessageSelfTest.java | 88 ++++++++++++++++++--
1 file changed, 79 insertions(+), 9 deletions(-)
----------------------------------------------------------------------