You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 14:29:51 UTC

[1/6] incubator-ignite git commit: # ignite-51

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-51 2f43d32d1 -> 7f9a6630b


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 167c2e9..7f39503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -366,7 +366,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 return new GridFinishedFuture<>(cctx.kernalContext(), e);
             }
         }
-        else
+        else {
             return cctx.kernalContext().closure().callLocalSafe(
                 new GPC<Boolean>() {
                     @Override public Boolean call() throws Exception {
@@ -381,6 +381,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     }
                 },
                 true);
+        }
     }
 
     /**
@@ -502,8 +503,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         if (intercept || !F.isEmpty(e.entryProcessors()))
                             e.cached().unswap(true, false);
 
-                        // TODO IGNITE-51 (do not need convert to CacheObject to pass to store?).
-                        GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(e, false);
+                        IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
 
                         GridCacheContext cacheCtx = e.context();
 
@@ -716,7 +716,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
                                         txEntry.cached().unswap(true, false);
 
-                                    GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(txEntry,
+                                    IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
                                         true);
 
                                     // For near local transactions we must record DHT version
@@ -740,7 +740,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                                     GridCacheOperation op = res.get1();
                                     CacheObject val = res.get2();
-                                    byte[] valBytes = res.get3();
 
                                     // Deal with conflicts.
                                     GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
@@ -814,7 +813,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             eventNodeId(),
                                             txEntry.nodeId(),
                                             val,
-                                            valBytes,
                                             false,
                                             false,
                                             txEntry.ttl(),
@@ -834,7 +832,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 eventNodeId(),
                                                 nodeId,
                                                 val,
-                                                valBytes,
                                                 false,
                                                 false,
                                                 txEntry.ttl(),
@@ -1101,7 +1098,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /**
      * Checks if there is a cached or swapped value for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean)} method.
+     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean)} method.
      *
      * @param cacheCtx Cache context.
      * @param keys Key to enlist.
@@ -1112,20 +1109,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
      * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
+     * @param keepCacheObjects Keep cache objects flag.
      * @throws IgniteCheckedException If failed.
      * @return Enlisted keys.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
     private <K, V> Collection<KeyCacheObject> enlistRead(
         final GridCacheContext cacheCtx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         Map<K, V> map,
         Map<KeyCacheObject, GridCacheVersion> missed,
         int keysCnt,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean keepCacheObjects
     ) throws IgniteCheckedException {
         assert !F.isEmpty(keys);
         assert keysCnt == keys.size();
@@ -1144,16 +1143,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         // In this loop we cover only read-committed or optimistic transactions.
         // Transactions that are pessimistic and not read-committed are covered
         // outside of this loop.
-        for (K key : keys) {
-            if (key == null)
-                continue;
-
+        for (KeyCacheObject key : keys) {
             if (pessimistic() && !readCommitted() && !skipVals)
                 addActiveCache(cacheCtx);
 
-            KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
-
-            IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+            IgniteTxKey txKey = cacheCtx.txKey(key);
 
             // Check write map (always check writes first).
             IgniteTxEntry txEntry = entry(txKey);
@@ -1167,14 +1161,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     if (!F.isEmpty(txEntry.entryProcessors()))
                         val = txEntry.applyEntryProcessors(val);
 
-                    if (val != null) {
-                        Object val0 = val.value(cacheCtx, true);
-
-                        if (cacheCtx.portableEnabled())
-                            val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
-                        map.put(key, (V)CU.skipValue(val0, skipVals));
-                    }
+                    if (val != null)
+                        cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, false);
                 }
                 else {
                     assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
@@ -1205,15 +1193,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 if (!F.isEmpty(txEntry.entryProcessors()))
                                     val = txEntry.applyEntryProcessors(val);
 
-                                Object val0 = val.value(cacheCtx, !skipVals);
-
-                                if (cacheCtx.portableEnabled())
-                                    val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
-                                map.put(key, (V)CU.skipValue(val0, skipVals));
+                                cacheCtx.addResult(map,
+                                    key,
+                                    val,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                             else
-                                missed.put(cacheKey, txEntry.cached().version());
+                                missed.put(key, txEntry.cached().version());
 
                             break;
                         }
@@ -1233,10 +1222,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             // First time access within transaction.
             else {
                 if (lockKeys == null && !skipVals)
-                    lockKeys = single ? Collections.singleton(cacheKey) : new ArrayList<KeyCacheObject>(keysCnt);
+                    lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
 
                 if (!single && !skipVals)
-                    lockKeys.add(cacheKey);
+                    lockKeys.add(key);
 
                 while (true) {
                     GridCacheEntryEx entry;
@@ -1273,19 +1262,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 accessPlc);
 
                             if (val != null) {
-                                Object val0 = val.value(cacheCtx, false);
-
-                                if (cacheCtx.portableEnabled())
-                                    val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
-                                map.put(key, (V)CU.skipValue(val0, skipVals));
+                                cacheCtx.addResult(map,
+                                    key,
+                                    val,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                             else
-                                missed.put(cacheKey, ver);
+                                missed.put(key, ver);
                         }
                         else
                             // We must wait for the lock in pessimistic mode.
-                            missed.put(cacheKey, ver);
+                            missed.put(key, ver);
 
                         if (!readCommitted() && !skipVals) {
                             txEntry = addEntry(READ,
@@ -1392,22 +1382,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /**
      * Loads all missed keys for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean)} method.
+     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean)} method.
      *
      * @param cacheCtx Cache context.
      * @param map Return map.
      * @param missedMap Missed keys.
      * @param redos Keys to retry.
      * @param deserializePortable Deserialize portable flag.
+     * @param keepCacheObjects Keep cache objects flag.
      * @return Loaded key-value pairs.
      */
     private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
         final GridCacheContext cacheCtx,
         final Map<K, V> map,
         final Map<KeyCacheObject, GridCacheVersion> missedMap,
-        @Nullable final Collection<K> redos,
+        @Nullable final Collection<KeyCacheObject> redos,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        final boolean keepCacheObjects
     ) {
         assert redos != null || pessimistic();
 
@@ -1443,7 +1435,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                     CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
-                    Object visibleVal = val;
+                    CacheObject visibleVal = cacheVal;
 
                     IgniteTxKey txKey = cacheCtx.txKey(key);
 
@@ -1506,16 +1498,30 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             if (readCommitted() || groupLock() || skipVals) {
                                 cacheCtx.evicts().touch(e, topologyVersion());
 
-                                if (visibleVal != null)
-                                    map.put(key.<K>value(cacheCtx, false), (V)CU.skipValue(visibleVal, skipVals));
+                                if (visibleVal != null) {
+                                    cacheCtx.addResult(map,
+                                        key,
+                                        visibleVal,
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        false);
+                                }
                             }
                             else {
                                 assert txEntry != null;
 
                                 txEntry.setAndMarkValid(cacheVal);
 
-                                if (visibleVal != null)
-                                    map.put(key.<K>value(cacheCtx, false), (V)visibleVal);
+                                if (visibleVal != null) {
+                                    cacheCtx.addResult(map,
+                                        key,
+                                        visibleVal,
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        false);
+                                }
                             }
 
                             loaded.add(key);
@@ -1575,10 +1581,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     /** {@inheritDoc} */
     @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         @Nullable GridCacheEntryEx cached,
         final boolean deserializePortable,
-        final boolean skipVals) {
+        final boolean skipVals,
+        final boolean keepCacheObjects) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
 
@@ -1595,7 +1602,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
             final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
 
-            GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+            GridCacheProjectionImpl prj = cacheCtx.projectionPerCall();
 
             ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
 
@@ -1607,7 +1614,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 missed,
                 keysCnt,
                 deserializePortable,
-                skipVals);
+                skipVals,
+                keepCacheObjects);
 
             if (single && missed.isEmpty())
                 return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
@@ -1636,7 +1644,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                         // Load keys only after the locks have been acquired.
                         for (KeyCacheObject cacheKey : lockKeys) {
-                            K keyVal = cacheKey.<K>value(cacheCtx, false);
+                            K keyVal = (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx, false));
 
                             if (retMap.containsKey(keyVal))
                                 // We already have a return value.
@@ -1682,10 +1690,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         if (!F.isEmpty(txEntry.entryProcessors()))
                                             val0 = txEntry.applyEntryProcessors(val0);
 
-                                        if (cacheCtx.portableEnabled())
-                                            val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
-
-                                        retMap.put(keyVal, (V)val0);
+                                        cacheCtx.addResult(retMap,
+                                            cacheKey,
+                                            val,
+                                            skipVals,
+                                            keepCacheObjects,
+                                            deserializePortable,
+                                            false);
                                     }
 
                                     // Even though we bring the value back from lock acquisition,
@@ -1718,8 +1729,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             }
                         }
 
-                        if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
-                            return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, skipVals);
+                        if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) {
+                            return checkMissed(cacheCtx,
+                                retMap,
+                                missed,
+                                null,
+                                deserializePortable,
+                                skipVals,
+                                keepCacheObjects);
+                        }
 
                         return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
                     }
@@ -1764,12 +1782,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             else {
                 assert optimistic() || readCommitted() || groupLock() || skipVals;
 
-                final Collection<K> redos = new ArrayList<>();
+                final Collection<KeyCacheObject> redos = new ArrayList<>();
 
                 if (!missed.isEmpty()) {
                     if (!readCommitted())
                         for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
-                            K keyVal = it.next().value(cacheCtx, false);
+                            KeyCacheObject cacheKey = it.next();
+
+                            K keyVal = (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx, false));
 
                             if (retMap.containsKey(keyVal))
                                 it.remove();
@@ -1781,7 +1801,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     return new GridEmbeddedFuture<>(
                         cctx.kernalContext(),
                         // First future.
-                        checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals),
+                        checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals, keepCacheObjects),
                         // Closure that returns another future, based on result from first.
                         new PMC<Map<K, V>>() {
                             @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) {
@@ -1793,27 +1813,37 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     log.debug("Starting to future-recursively get values for keys: " + redos);
 
                                 // Future recursion.
-                                return getAllAsync(cacheCtx, redos, null, deserializePortable, skipVals);
+                                return getAllAsync(cacheCtx,
+                                    redos,
+                                    null,
+                                    deserializePortable,
+                                    skipVals,
+                                    true);
                             }
                         },
                         // Finalize.
                         new FinishClosure<Map<K, V>>() {
                             @Override Map<K, V> finish(Map<K, V> loaded) {
                                 for (Map.Entry<K, V> entry : loaded.entrySet()) {
-                                    // TODO IGNITE-51.
-                                    KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(entry.getKey());
+                                    KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey();
 
                                     IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey));
 
-                                    V val = entry.getValue();
+                                    CacheObject val = (CacheObject)entry.getValue();
 
                                     if (!readCommitted())
-                                        txEntry.readValue(cacheCtx.toCacheObject(val));
+                                        txEntry.readValue(val);
 
                                     if (!F.isEmpty(txEntry.entryProcessors()))
                                         val = txEntry.applyEntryProcessors(val);
 
-                                    retMap.put(entry.getKey(), val);
+                                    cacheCtx.addResult(retMap,
+                                        cacheKey,
+                                        val,
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        false);
                                 }
 
                                 return retMap;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0391e58..43737e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -69,14 +69,17 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param cached Cached entry if this method is called from entry wrapper.
      *      Cached entry is passed if and only if there is only one key in collection of keys.
      * @param deserializePortable Deserialize portable flag.
+     * @param skipVals Skip values flag.
+     * @param keepCacheObjects Keep cache objects
      * @return Future for this get.
      */
     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         GridCacheContext cacheCtx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         @Nullable GridCacheEntryEx cached,
         boolean deserializePortable,
-        boolean skipVals);
+        boolean skipVals,
+        boolean keepCacheObjects);
 
     /**
      * @param cacheCtx Cache context.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index e7c401c..f9cb4cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -367,8 +367,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
         // TODO IGNITE-51.
         Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() {
             @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) {
-                KeyCacheObject key = cacheObjProc.toCacheKeyObject(null, e.getKey());
-                CacheObject val = cacheObjProc.toCacheObject(null, e.getValue());
+                KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey());
+                CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), null);
 
                 return new IgniteDataLoaderEntry(key, val);
             }
@@ -461,8 +461,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
     @Override public IgniteFuture<?> addData(K key, V val) {
         A.notNull(key, "key");
 
-        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(null, key);
-        CacheObject val0 = cacheObjProc.toCacheObject(null, val);
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key);
+        CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, null);
 
         return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0)));
     }
@@ -1332,7 +1332,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
                     entry.unswap(true, false);
 
                     entry.initialValue(e.getValue(),
-                        null,
                         ver,
                         CU.TTL_ETERNAL,
                         CU.EXPIRE_TIME_ETERNAL,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
index 880445f..5da5ee9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
@@ -158,7 +158,7 @@ public interface GridPortableProcessor extends GridProcessor {
      * @param obj Object.
      * @return Cache object.
      */
-    @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj);
+    @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes);
 
     /**
      * @param obj Key value.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
index 8738bf4..414c292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
@@ -143,10 +143,13 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj) {
-        if (obj == null || obj instanceof CacheObject)
+    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes) {
+        if ((obj == null && bytes == null) || obj instanceof CacheObject)
             return (CacheObject)obj;
 
+        if (bytes != null)
+            return new CacheObjectImpl(obj, bytes);
+
         return new UserCacheObjectImpl(obj);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 1aa7c96..b53386c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -1460,8 +1460,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             m.put(null, 2);
 
             GridTestUtils.assertThrows(log, new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
+                @Override public Void call() throws Exception {
                     cache.putAll(m);
 
                     return null;
@@ -1480,8 +1479,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             m.put("key4", null);
 
             GridTestUtils.assertThrows(log, new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
+                @Override public Void call() throws Exception {
                     cache.putAll(m);
 
                     return null;
@@ -1696,7 +1694,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
             IgniteFuture<Integer> fut1 = cacheAsync.future();
 
-            assert fut1.get() == null;
+            assertNull(fut1.get());
             assertEquals((Integer)1, cache.get("key"));
 
             cacheAsync.getAndPutIfAbsent("key", 2);
@@ -3934,7 +3932,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         CacheAffinity<Integer> aff = ignite(0).affinity(null);
 
         boolean near = cfg.getDistributionMode() == CacheDistributionMode.NEAR_PARTITIONED ||
-                cfg.getDistributionMode() == CacheDistributionMode.NEAR_ONLY;
+            cfg.getDistributionMode() == CacheDistributionMode.NEAR_ONLY;
 
         ClusterNode locNode = ignite(0).cluster().localNode();
 
@@ -3964,7 +3962,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
             assertEquals(i, getObj1.val);
 
-            if (loc)
+            if (loc && !offHeapValues())
                 assertSame("Same expected [key=" + i + ", primary=" + primary + ", backup=" + backup + ']',
                     putObj1,
                     getObj1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6cdd97e..40691d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -453,7 +453,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         UUID evtNodeId,
         UUID affNodeId,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
         long ttl,
@@ -467,7 +466,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** {@inheritDoc} */
-    @Override public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+    @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
         GridCacheVersion ver,
         GridCacheOperation op,
         @Nullable Object writeObj,
@@ -492,7 +491,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         UUID affNodeId,
         GridCacheOperation op,
         @Nullable Object val,
-        @Nullable byte[] valBytes,
         @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
@@ -653,7 +651,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
 
     /** @inheritDoc */
     @Override public boolean initialValue(CacheObject val,
-        @Nullable byte[] valBytes,
         GridCacheVersion ver,
         long ttl,
         long expireTime,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
index 3336798..4b5a260 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFullApiSelfTest.java
@@ -38,6 +38,7 @@ public class GridCacheReplicatedFullApiSelfTest extends GridCacheAbstractFullApi
         return PARTITIONED_ONLY;
     }
 
+    /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration c = super.getConfiguration(gridName);
 


[3/6] incubator-ignite git commit: # ignite-51

Posted by sb...@apache.org.
# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a040311d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a040311d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a040311d

Branch: refs/heads/ignite-51
Commit: a040311d2eb88d4d3e49aa9c885007c85a6a442b
Parents: bdb0f55
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 10:13:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:03:57 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheObjectAdapter.java    |   1 +
 .../processors/cache/GridCacheAdapter.java      |  46 ++---
 .../processors/cache/GridCacheContext.java      |  45 ++--
 .../processors/cache/GridCacheEntryEx.java      |  15 +-
 .../processors/cache/GridCacheMapEntry.java     | 206 +++++++++----------
 .../processors/cache/GridCacheMessage.java      |  28 ++-
 .../processors/cache/GridCacheSwapManager.java  |  44 ++--
 .../processors/cache/KeyCacheObjectImpl.java    |   5 +-
 .../cache/UserKeyCacheObjectImpl.java           |  31 ++-
 .../GridDistributedCacheAdapter.java            |  14 +-
 .../GridDistributedTxPrepareRequest.java        | 110 +++++++---
 .../GridDistributedTxRemoteAdapter.java         |  25 ++-
 .../distributed/dht/GridDhtCacheAdapter.java    |   2 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  30 ++-
 .../cache/distributed/dht/GridDhtGetFuture.java |  29 ++-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  14 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  38 +---
 .../distributed/dht/GridDhtTxPrepareFuture.java |   4 +-
 .../dht/GridDhtTxPrepareRequest.java            |  50 ++---
 .../dht/GridPartitionedGetFuture.java           |  20 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  42 ++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  13 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  68 +++---
 .../colocated/GridDhtColocatedLockFuture.java   |  16 +-
 .../colocated/GridDhtDetachedCacheEntry.java    |  12 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   1 -
 .../preloader/GridDhtPartitionDemandPool.java   |   1 -
 .../distributed/near/GridNearAtomicCache.java   |   6 +-
 .../distributed/near/GridNearCacheAdapter.java  |   2 +-
 .../distributed/near/GridNearCacheEntry.java    |  29 +--
 .../distributed/near/GridNearLockFuture.java    |   4 +-
 .../near/GridNearTransactionalCache.java        |  13 +-
 .../cache/distributed/near/GridNearTxLocal.java | 126 ++++++------
 .../near/GridNearTxPrepareFuture.java           |   4 +-
 .../near/GridNearTxPrepareRequest.java          |  42 ++--
 .../near/GridNearTxPrepareResponse.java         |  22 +-
 .../processors/cache/local/GridLocalCache.java  |  10 +-
 .../cache/local/GridLocalLockFuture.java        |   2 +-
 .../local/atomic/GridLocalAtomicCache.java      |  14 +-
 .../cache/transactions/IgniteTxAdapter.java     |  18 +-
 .../cache/transactions/IgniteTxEntry.java       |   3 +-
 .../transactions/IgniteTxLocalAdapter.java      | 170 ++++++++-------
 .../cache/transactions/IgniteTxLocalEx.java     |   7 +-
 .../dataload/IgniteDataLoaderImpl.java          |   9 +-
 .../portable/GridPortableProcessor.java         |   2 +-
 .../portable/os/GridOsPortableProcessor.java    |   7 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  12 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../GridCacheReplicatedFullApiSelfTest.java     |   1 +
 50 files changed, 725 insertions(+), 695 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 8edc6c8..5cf3521 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -43,6 +43,7 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
     }
 
     /**
+     * @param ctx Context.
      * @return {@code True} need to copy value returned to user.
      */
     protected boolean needCopy(GridCacheContext ctx) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7cb0b1f..27d0065 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -558,7 +558,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @return Locks future.
      */
     public abstract IgniteInternalFuture<Boolean> txLockAsync(
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx tx,
         boolean isRead,
@@ -2233,16 +2233,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         if (keyCheck)
             validateCacheKeys(keys);
 
-        Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
-            @Override public KeyCacheObject apply(K key) {
-                if (key == null)
-                    throw new NullPointerException("Null key.");
-
-                return ctx.toCacheKeyObject(key);
-            }
-        });
-
-        return getAllAsync0(keys0,
+        return getAllAsync0(ctx.cacheKeysView(keys),
             readThrough,
             checkTx,
             subjId,
@@ -2263,7 +2254,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @param expiry Expiry policy.
      * @param skipVals Skip values flag.
      * @param keepCacheObjects Keep cache objects
-     * @return
+     * @return Future.
      */
     public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
         boolean readThrough,
@@ -2502,13 +2493,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             }
         }
         else {
-            return null;
-// TODO IGNITE-51.
-//            return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-//                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
-//                    return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, cached0, deserializePortable, skipVals));
-//                }
-//            });
+            return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
+                @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
+                    return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false);
+                }
+            });
         }
     }
 
@@ -4166,7 +4155,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         GridCacheEntryEx entry = entryEx(key, false);
 
         try {
-            entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
+            entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
                 replicate ? DR_LOAD : DR_NONE);
         }
         catch (IgniteCheckedException e) {
@@ -4615,8 +4604,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             GridCacheEntryEx entry = peekEx(cacheKey);
 
             try {
-                if (entry == null || entry.obsolete() || entry.isNewLocked())
+                if (entry == null || entry.obsolete() || entry.isNewLocked()) {
+                    if (entry != null)
+                        cacheKey = entry.key();
+
                     unswap.add(cacheKey);
+                }
             }
             catch (GridCacheEntryRemovedException ignored) {
                 // No-op.
@@ -5856,7 +5849,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         private final boolean single;
 
         /** Keys. */
-        private final Collection<? extends K> keys;
+        private final Collection<?> keys;
 
         /**
          * @param key Key.
@@ -5870,7 +5863,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         /**
          * @param keys Keys involved.
          */
-        protected AsyncOp(Collection<? extends K> keys) {
+        protected AsyncOp(Collection<?> keys) {
             this.keys = keys;
 
             single = keys.size() == 1;
@@ -5884,13 +5877,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         }
 
         /**
-         * @return Keys.
-         */
-        Collection<? extends K> keys() {
-            return keys;
-        }
-
-        /**
          * @param tx Transaction.
          * @return Operation return value.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b4e3df..3cbe8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1322,26 +1322,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
-     * @param f Target future.
-     * @return Wrapped future that is aware of cloning behaviour.
-     */
-    public IgniteInternalFuture<Map<K, V>> wrapCloneMap(IgniteInternalFuture<Map<K, V>> f) {
-        if (!hasFlag(CLONE))
-            return f;
-
-        return f.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() {
-            @Override public Map<K, V> applyx(IgniteInternalFuture<Map<K, V>> f) throws IgniteCheckedException {
-                Map<K, V> map = new GridLeanMap<>();
-
-                for (Map.Entry<K, V> e : f.get().entrySet())
-                    map.put(e.getKey(), cloneValue(e.getValue()));
-
-                return map;
-            }
-        });
-    }
-
-    /**
      * Creates Runnable that can be executed safely in a different thread inheriting
      * the same thread local projection as for the current thread. If no projection is
      * set for current thread then there's no need to create new object and method simply
@@ -1780,7 +1760,15 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Cache object.
      */
     @Nullable public CacheObject toCacheObject(@Nullable Object obj) {
-        return portable().toCacheObject(cacheObjCtx, obj);
+        return portable().toCacheObject(cacheObjCtx, obj, null);
+    }
+
+    /**
+     * @param obj Object.
+     * @return Cache object.
+     */
+    @Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) {
+        return portable().toCacheObject(cacheObjCtx, obj, bytes);
     }
 
     /**
@@ -1946,6 +1934,21 @@ public class GridCacheContext<K, V> implements Externalizable {
                     mgr.printMemoryStats();
     }
 
+    /**
+     * @param keys Keys.
+     * @return Co
+     */
+    public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys) {
+        return F.viewReadOnly(keys, new C1<Object, KeyCacheObject>() {
+            @Override public KeyCacheObject apply(Object key) {
+                if (key == null)
+                    throw new NullPointerException("Null key.");
+
+                return toCacheKeyObject(key);
+            }
+        });
+    };
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, gridName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 8ed070e..30df242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -244,7 +244,6 @@ public interface GridCacheEntryEx {
      * @return Swap entry if this entry was marked obsolete, {@code null} if entry was not evicted.
      * @throws IgniteCheckedException If failed.
      */
-    // TODO IGNITE-51
     public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException;
 
     /**
@@ -333,7 +332,6 @@ public interface GridCacheEntryEx {
      * @param evtNodeId ID of node responsible for this change.
      * @param affNodeId Partitioned node iD.
      * @param val Value to set.
-     * @param valBytes Value bytes to set.
      * @param writeThrough If {@code true} then persist to storage.
      * @param retval {@code True} if value should be returned (and unmarshalled if needed).
      * @param ttl Time to live.
@@ -356,7 +354,6 @@ public interface GridCacheEntryEx {
         UUID evtNodeId,
         UUID affNodeId,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
         long ttl,
@@ -412,7 +409,6 @@ public interface GridCacheEntryEx {
      * @param affNodeId Affinity node ID.
      * @param op Update operation.
      * @param val Value. Type depends on operation.
-     * @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
      * @param invokeArgs Optional arguments for entry processor.
      * @param writeThrough Write through flag.
      * @param retval Return value flag.
@@ -445,7 +441,6 @@ public interface GridCacheEntryEx {
         UUID affNodeId,
         GridCacheOperation op,
         @Nullable Object val,
-        @Nullable byte[] valBytes,
         @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
@@ -485,7 +480,7 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException If update failed.
      * @throws GridCacheEntryRemovedException If entry is obsolete.
      */
-    public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+    public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
         GridCacheVersion ver,
         GridCacheOperation op,
         @Nullable Object writeObj,
@@ -501,7 +496,6 @@ public interface GridCacheEntryEx {
         String taskName
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
-
     /**
      * Marks entry as obsolete and, if possible or required, removes it
      * from swap storage.
@@ -662,7 +656,6 @@ public interface GridCacheEntryEx {
      * Sets new value if current version is <tt>0</tt>
      *
      * @param val New value.
-     * @param valBytes Value bytes.
      * @param ver Version to use.
      * @param ttl Time to live.
      * @param expireTime Expiration time.
@@ -674,7 +667,6 @@ public interface GridCacheEntryEx {
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
     public boolean initialValue(CacheObject val,
-        @Nullable byte[] valBytes,
         GridCacheVersion ver,
         long ttl,
         long expireTime,
@@ -692,7 +684,6 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException In case of error.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    // TODO IGNITE-51
     public boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped)
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
@@ -715,7 +706,9 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException If index could not be updated.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    public boolean versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer)
+    public boolean versionedValue(CacheObject val,
+        @Nullable GridCacheVersion curVer,
+        @Nullable GridCacheVersion newVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74f337b..77a7343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -120,7 +120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     private final int hash;
 
     /** Off-heap value pointer. */
-    private long valPtr;
+    protected long valPtr;
 
     /** Extras */
     @GridToStringInclude
@@ -162,7 +162,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
 
         synchronized (this) {
-            value(val, null);
+            value(val);
         }
 
         next(hdrId, next);
@@ -181,9 +181,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      * Sets entry value. If off-heap value storage is enabled, will serialize value to off-heap.
      *
      * @param val Value to store.
-     * @param valBytes Value bytes to store.
      */
-    protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) {
+    protected void value(@Nullable CacheObject val) {
         assert Thread.holdsLock(this);
 
         // In case we deal with IGFS cache, count updated data
@@ -205,17 +204,18 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         else {
             try {
                 if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
-                    if (val != null || valBytes != null) {
-                        if (val == null)
-                            val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
+                    Object val0 = null;
 
-                        if (val != null)
-                            cctx.gridDeploy().deploy(val.getClass(), val.getClass().getClassLoader());
+                    if (val != null) {
+                        val0 = val.value(cctx, false);
+
+                        if (val0 != null)
+                            cctx.gridDeploy().deploy(val0.getClass(), val0.getClass().getClassLoader());
                     }
 
-                    if (U.p2pLoader(val)) {
+                    if (U.p2pLoader(val0)) {
                         cctx.deploy().addDeploymentContext(
-                            new GridDeploymentInfoBean((GridDeploymentInfo)val.getClass().getClassLoader()));
+                            new GridDeploymentInfoBean((GridDeploymentInfo)val0.getClass().getClassLoader()));
                     }
                 }
 
@@ -527,7 +527,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                         val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
 
                         // Set unswapped value.
-                        update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version());
+                        update(val, e.expireTime(), e.ttl(), e.version());
 
                         // Must update valPtr again since update() will reset it.
                         if (cctx.offheapTiered() && e.offheapPointer() > 0)
@@ -598,7 +598,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     /**
      * @return Value bytes and flag indicating whether value is byte array.
      */
-    private IgniteBiTuple<byte[], Boolean> valueBytes0() {
+    protected IgniteBiTuple<byte[], Boolean> valueBytes0() {
         if (valPtr != 0) {
             assert isOffHeapValuesOnly() || cctx.offheapTiered();
 
@@ -784,7 +784,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             if (expired) {
                 expiredVal = val;
 
-                value(null, null);
+                value(null);
             }
 
             if (old == null && !hasOldBytes) {
@@ -879,12 +879,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                     if (loadedFromStore)
                         // Update indexes before actual write to entry.
-                        updateIndex(ret, null, expTime, nextVer, prevVal);
+                        updateIndex(ret, expTime, nextVer, prevVal);
 
                     boolean hadValPtr = valPtr != 0;
 
                     // Don't change version for read-through.
-                    update(ret, null, expTime, ttl, nextVer);
+                    update(ret, expTime, ttl, nextVer);
 
                     if (hadValPtr && cctx.offheapTiered())
                         cctx.swap().removeOffheap(key);
@@ -956,7 +956,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                     // Update indexes.
                     if (ret != null) {
-                        updateIndex(ret, null, expTime, nextVer, old);
+                        updateIndex(ret, expTime, nextVer, old);
 
                         if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked())
                             deletedUnlocked(false);
@@ -968,7 +968,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                             deletedUnlocked(true);
                     }
 
-                    update(ret, null, expTime, ttl, nextVer);
+                    update(ret, expTime, ttl, nextVer);
 
                     touch = true;
 
@@ -1000,7 +1000,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         UUID evtNodeId,
         UUID affNodeId,
         CacheObject val,
-        @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
         long ttl,
@@ -1065,11 +1064,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 if (interceptorVal == null)
                     return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old));
-                else if (interceptorVal != val0) {
+                else if (interceptorVal != val0)
                     val = cctx.toCacheKeyObject(cctx.unwrapTemporary(interceptorVal));
-
-                    valBytes = null;
-                }
             }
 
             // Determine new ttl and expire time.
@@ -1097,16 +1093,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             // Update index inside synchronization since it can be updated
             // in load methods without actually holding entry lock.
-            if (val != null || valBytes != null) {
-                updateIndex(val, valBytes, expireTime, newVer, old);
+            if (val != null) {
+                updateIndex(val, expireTime, newVer, old);
 
                 if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
                     deletedUnlocked(false);
             }
 
-            update(val, valBytes, expireTime, ttl, newVer);
+            update(val, expireTime, ttl, newVer);
 
-            drReplicate(drType, val, valBytes, newVer);
+            drReplicate(drType, val, newVer);
 
             recordNodeId(affNodeId);
 
@@ -1231,7 +1227,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             boolean hadValPtr = valPtr != 0;
 
-            update(null, null, 0, 0, newVer);
+            update(null, 0, 0, newVer);
 
             if (cctx.offheapTiered() && hadValPtr) {
                 boolean rmv = cctx.swap().removeOffheap(key);
@@ -1254,7 +1250,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 }
             }
 
-            drReplicate(drType, null, null, newVer);
+            drReplicate(drType, null, newVer);
 
             if (metrics && cctx.cache().configuration().isStatisticsEnabled())
                 cctx.cache().metrics0().onRemove();
@@ -1343,7 +1339,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> innerUpdateLocal(
+    @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
         GridCacheVersion ver,
         GridCacheOperation op,
         @Nullable Object writeObj,
@@ -1412,11 +1408,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx);
 
                 if (old != null)
-                    updateIndex(old, null, expireTime, ver, null);
+                    updateIndex(old, expireTime, ver, null);
                 else
                     clearIndex(null);
 
-                update(old, null, expireTime, ttl, ver);
+                update(old, expireTime, ttl, ver);
             }
 
             // Apply metrics.
@@ -1434,7 +1430,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked())
                         updateTtl(expiryPlc);
 
-                    return new T3<>(false, retval ? old : null, null);
+                    return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null);
                 }
             }
 
@@ -1442,6 +1438,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             CacheObject updated;
 
+            Object key0 = null;
+            Object updated0 = null;
+
             // Calculate new value.
             if (op == GridCacheOperation.TRANSFORM) {
                 transformCloClsName = writeObj.getClass().getName();
@@ -1450,16 +1449,19 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 assert entryProcessor != null;
 
-                // TODO IGNITE-51.
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx,
-                    key.value(cctx, false),
-                    old.value(cctx, false));
+                key0 = key.value(cctx, false);
+                old0 = value(old0, old, false);
+
+                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key0, old0);
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);
 
-                    if (entry.modified())
-                        updated = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
+                    if (entry.modified()) {
+                        updated0 = cctx.unwrapTemporary(entry.getValue());
+
+                        updated = cctx.toCacheObject(updated0);
+                    }
                     else
                         updated = old;
 
@@ -1484,19 +1486,26 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
 
             if (intercept) {
-// TODO IGNITE-51.
-//                if (op == GridCacheOperation.UPDATE) {
-//                    updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
-//
-//                    if (updated == null)
-//                        return new GridTuple3<>(false, cctx.<V>unwrapTemporary(old), invokeRes);
-//                }
-//                else {
-//                    interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
-//
-//                    if (cctx.cancelRemove(interceptorRes))
-//                        return new GridTuple3<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()), invokeRes);
-//                }
+                if (op == GridCacheOperation.UPDATE) {
+                    key0 = value(key0, key, false);
+                    updated0 = value(updated0, updated, false);
+                    old0 = value(old0, old, false);
+
+                    Object interceptorVal
+                        = cctx.config().getInterceptor().onBeforePut(key0, old0, updated0);
+
+                    if (interceptorVal == null)
+                        return new GridTuple3<>(false, cctx.unwrapTemporary(old0), invokeRes);
+                }
+                else {
+                    key0 = value(key0, key, false);
+                    old0 = value(old0, old, false);
+
+                    interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0);
+
+                    if (cctx.cancelRemove(interceptorRes))
+                        return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes);
+                }
             }
 
             boolean hadVal = hasValueUnlocked();
@@ -1535,11 +1544,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                updateIndex(updated, null, expireTime, ver, old);
+                updateIndex(updated, expireTime, ver, old);
 
                 assert ttl != CU.TTL_ZERO;
 
-                update(updated, null, expireTime, ttl, ver);
+                update(updated, expireTime, ttl, ver);
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -1571,7 +1580,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
 
-                update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
+                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -1603,15 +1612,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(key, val);
+                    cctx.config().getInterceptor().onAfterPut(key0, val);
                 else
-                    cctx.config().getInterceptor().onAfterRemove(key, old);
+                    cctx.config().getInterceptor().onAfterRemove(key0, old0);
             }
         }
 
-        // TODO IGNITE-51.
         return new GridTuple3<>(res,
-            cctx.<CacheObject>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old),
+            cctx.unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : CU.value(old, cctx, false)),
             invokeRes);
     }
 
@@ -1623,7 +1631,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         UUID affNodeId,
         GridCacheOperation op,
         @Nullable Object writeObj,
-        @Nullable byte[] valBytes,
         @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
@@ -1814,7 +1821,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
                 old0 = readThrough(null, key, false, subjId, taskName);
 
-                oldVal = cctx.toCacheObject(oldVal);
+                oldVal = cctx.toCacheObject(old0);
 
                 readThrough = true;
 
@@ -1837,11 +1844,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 }
 
                 if (oldVal != null)
-                    updateIndex(oldVal, null, initExpireTime, ver, null);
+                    updateIndex(oldVal, initExpireTime, ver, null);
                 else
                     clearIndex(null);
 
-                update(oldVal, null, initExpireTime, initTtl, ver);
+                update(oldVal, initExpireTime, initTtl, ver);
 
                 if (deletedUnlocked() && oldVal != null && !isInternal())
                     deletedUnlocked(false);
@@ -1856,7 +1863,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             // Check filter inside of synchronization.
             if (!F.isEmptyOrNulls(filter)) {
-                // TODO IGNITE-51 can get key/value only once.
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
                 if (!pass) {
@@ -1903,15 +1909,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     if (computed != null)
                         invokeRes = new CacheInvokeDirectResult(key,
                             cctx.toCacheObject(cctx.unwrapTemporary(computed)));
-
-                    valBytes = null;
                 }
                 catch (Exception e) {
                     invokeRes = new CacheInvokeDirectResult(key, e);
 
                     updated = oldVal;
-
-                    valBytes = oldValBytes.getIfMarshaled();
                 }
 
                 if (!entry.modified()) {
@@ -1981,7 +1983,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                             newExpireTime = CU.EXPIRE_TIME_ETERNAL;
 
                             updated = null;
-                            valBytes = null;
                         }
                         else {
                             newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
@@ -2030,10 +2031,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                             null,
                             null,
                             false);
-                    else if (interceptorVal != updated0) {
+                    else if (interceptorVal != updated0)
                         updated = cctx.toCacheObject(cctx.unwrapTemporary(updated0));
-                        valBytes = null;
-                    }
                 }
 
                 // Try write-through.
@@ -2061,11 +2060,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                updateIndex(updated, valBytes, newExpireTime, newVer, oldVal);
+                updateIndex(updated, newExpireTime, newVer, oldVal);
 
-                update(updated, valBytes, newExpireTime, newTtl, newVer);
+                update(updated, newExpireTime, newTtl, newVer);
 
-                drReplicate(drType, updated, valBytes, newVer);
+                drReplicate(drType, updated, newVer);
 
                 recordNodeId(affNodeId);
 
@@ -2140,7 +2139,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 boolean hasValPtr = valPtr != 0;
 
                 // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
 
                 assert newSysTtl == CU.TTL_NOT_CHANGED;
                 assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
@@ -2155,7 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 recordNodeId(affNodeId);
 
-                drReplicate(drType, null, null, newVer);
+                drReplicate(drType, null, newVer);
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -2304,11 +2303,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      *
      * @param drType DR type.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ver Version.
      * @throws IgniteCheckedException In case of exception.
      */
-    private void drReplicate(GridDrType drType, @Nullable CacheObject val, @Nullable byte[] valBytes, GridCacheVersion ver)
+    private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver)
         throws IgniteCheckedException {
 // TODO IGNITE-51.
 //        if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
@@ -2456,7 +2454,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                     if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
                         if (!deletedUnlocked()) {
-                            update(null, null, 0L, 0L, ver);
+                            update(null, 0L, 0L, ver);
 
                             deletedUnlocked(true);
 
@@ -2528,7 +2526,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 obsoleteVersionExtras(obsoleteVer);
 
                 if (clear)
-                    value(null, null);
+                    value(null);
             }
 
             return obsoleteVer != null;
@@ -2562,7 +2560,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         if (curVer == null || ver.equals(curVer)) {
             CacheObject val = saveValueForIndexUnlocked();
 
-            value(null, null);
+            value(null);
 
             ver = newVer;
 
@@ -2666,13 +2664,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     /**
      *
      * @param val New value.
-     * @param valBytes New value bytes.
      * @param expireTime Expiration time.
      * @param ttl Time to live.
      * @param ver Update version.
      */
-    protected final void update(@Nullable CacheObject val, @Nullable byte[] valBytes, long expireTime, long ttl,
-        GridCacheVersion ver) {
+    protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) {
         assert ver != null;
         assert Thread.holdsLock(this);
         assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
@@ -2682,7 +2678,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
             cctx.ttl().removeTrackedEntry(this);
 
-        value(val, valBytes);
+        value(val);
 
         ttlAndExpireTimeExtras(ttl, expireTime);
 
@@ -2986,9 +2982,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
 
-            updateIndex(val, null, expireTime, nextVer, old);
+            updateIndex(val, expireTime, nextVer, old);
 
-            update(val, null, expireTime, ttlExtras(), nextVer);
+            update(val, expireTime, ttlExtras(), nextVer);
         }
 
         if (log.isDebugEnabled())
@@ -3238,7 +3234,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     @Override public synchronized CacheObject rawPut(CacheObject val, long ttl) {
         CacheObject old = this.val;
 
-        update(val, null, CU.toExpireTime(ttl), ttl, nextVersion());
+        update(val, CU.toExpireTime(ttl), ttl, nextVersion());
 
         return old;
     }
@@ -3247,7 +3243,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     @SuppressWarnings({"RedundantTypeArguments"})
     @Override public boolean initialValue(
         CacheObject val,
-        byte[] valBytes,
         GridCacheVersion ver,
         long ttl,
         long expireTime,
@@ -3267,15 +3262,15 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
 
-                if (val != null || valBytes != null)
-                    updateIndex(val, valBytes, expTime, ver, null);
+                if (val != null)
+                    updateIndex(val, expTime, ver, null);
 
                 // Version does not change for load ops.
-                update(val, valBytes, expTime, ttl, ver);
+                update(val, expTime, ttl, ver);
 
                 boolean skipQryNtf = false;
 
-                if (val == null && valBytes == null) {
+                if (val == null) {
                     skipQryNtf = true;
 
                     if (cctx.deferredDelete() && !isInternal()) {
@@ -3287,7 +3282,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 else if (deletedUnlocked())
                     deletedUnlocked(false);
 
-                drReplicate(drType, val, valBytes, ver);
+                drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
                     if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
@@ -3322,7 +3317,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             // Version does not change for load ops.
             update(val,
-                unswapped.valueBytes(),
                 unswapped.expireTime(),
                 unswapped.ttl(),
                 unswapped.version()
@@ -3365,14 +3359,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
 
                 if (val != null) {
-                    updateIndex(val, null, expTime, newVer, old);
+                    updateIndex(val, expTime, newVer, old);
 
                     if (deletedUnlocked())
                         deletedUnlocked(false);
                 }
 
                 // Version does not change for load ops.
-                update(val, null, expTime, ttl, newVer);
+                update(val, expTime, ttl, newVer);
             }
 
             return true;
@@ -3584,7 +3578,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 if (expired) {
                     if (cctx.deferredDelete() && !detached() && !isInternal()) {
                         if (!deletedUnlocked()) {
-                            update(null, null, 0L, 0L, ver);
+                            update(null, 0L, 0L, ver);
 
                             deletedUnlocked(true);
 
@@ -3785,25 +3779,23 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      * Updates cache index.
      *
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param expireTime Expire time.
      * @param ver New entry version.
      * @param prevVal Previous value.
      * @throws IgniteCheckedException If update failed.
      */
     protected void updateIndex(@Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         long expireTime,
         GridCacheVersion ver,
         @Nullable CacheObject prevVal) throws IgniteCheckedException {
         assert Thread.holdsLock(this);
-        assert val != null || valBytes != null : "null values in update index for key: " + key;
+        assert val != null : "null values in update index for key: " + key;
 
         try {
             GridCacheQueryManager qryMgr = cctx.queries();
 
             if (qryMgr != null)
-                qryMgr.store(key, null, val, valBytes, ver, expireTime);
+                qryMgr.store(key, null, val, null, ver, expireTime);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3920,7 +3912,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                             clearIndex(prev);
 
                         // Nullify value after swap.
-                        value(null, null);
+                        value(null);
 
                         marked = true;
 
@@ -3963,7 +3955,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                                 clearIndex(prevVal);
 
                             // Nullify value after swap.
-                            value(null, null);
+                            value(null);
 
                             marked = true;
 
@@ -4018,7 +4010,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                         valClsLdrId);
                 }
 
-                value(null, null);
+                value(null);
             }
         }
         catch (GridCacheEntryRemovedException ignored) {
@@ -4491,7 +4483,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 }
 
                 if (detached())
-                    return rawGet().value(cctx, false);
+                    return CU.value(rawGet(), cctx, false);
 
                 for (;;) {
                     GridCacheEntryEx e = cctx.cache().peekEx(key);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index a4d7384..fffa35f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -587,8 +587,12 @@ public abstract class GridCacheMessage implements Message {
 
         int size = col.size();
 
-        for (int i = 0 ; i < size; i++)
-            col.get(i).prepareMarshal(ctx.cacheObjectContext());
+        for (int i = 0 ; i < size; i++) {
+            CacheObject obj = col.get(i);
+
+            if (obj != null)
+                obj.prepareMarshal(ctx.cacheObjectContext());
+        }
     }
 
     /**
@@ -601,8 +605,10 @@ public abstract class GridCacheMessage implements Message {
         if (col == null)
             return;
 
-        for (CacheObject obj : col)
-            obj.prepareMarshal(ctx.cacheObjectContext());
+        for (CacheObject obj : col) {
+            if (obj != null)
+                obj.prepareMarshal(ctx.cacheObjectContext());
+        }
     }
 
     /**
@@ -622,8 +628,12 @@ public abstract class GridCacheMessage implements Message {
 
         int size = col.size();
 
-        for (int i = 0 ; i < size; i++)
-            col.get(i).finishUnmarshal(ctx, ldr);
+        for (int i = 0 ; i < size; i++) {
+            CacheObject obj = col.get(i);
+
+            if (obj != null)
+                obj.finishUnmarshal(ctx, ldr);
+        }
     }
 
     /**
@@ -640,8 +650,10 @@ public abstract class GridCacheMessage implements Message {
         if (col == null)
             return;
 
-        for (CacheObject obj : col)
-            obj.finishUnmarshal(ctx, ldr);
+        for (CacheObject obj : col) {
+            if (obj != null)
+                obj.finishUnmarshal(ctx, ldr);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 4b21f8c..012d393 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -511,7 +511,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             // First check off-heap store.
             if (readOffheap && offheapEnabled) {
-                byte[] bytes = offheap.get(spaceName, part, key, key.valueBytes(cctx));
+                byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
 
                 if (bytes != null)
                     return swapEntry(unmarshalSwapEntry(bytes));
@@ -522,7 +522,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             assert key != null;
 
-            byte[] bytes = swapMgr.read(spaceName, new SwapKey(key, part, keyBytes), cctx.deploy().globalLoader());
+            byte[] bytes = swapMgr.read(spaceName,
+                new SwapKey(key.value(cctx, false), part, keyBytes),
+                cctx.deploy().globalLoader());
 
             if (bytes == null && lsnr != null)
                 return lsnr.entry;
@@ -606,7 +608,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         final GridTuple<GridCacheSwapEntry> t = F.t1();
         final GridTuple<IgniteCheckedException> err = F.t1();
 
-        swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), new CI1<byte[]>() {
+        swapMgr.remove(spaceName, new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)), new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
                 if (rmv != null) {
                     try {
@@ -727,7 +729,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return read(key, CU.marshal(cctx.shared(), key), part, false, readOffheap, readSwap);
+        return read(key, key.valueBytes(cctx), part, false, readOffheap, readSwap);
     }
 
     /**
@@ -801,8 +803,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     if (unprocessedKeys == null)
                         unprocessedKeys = new ArrayList<>(keys.size());
 
-                    unprocessedKeys.add(
-                        new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key)));
+                    SwapKey swapKey = new SwapKey(key.value(cctx, false),
+                        cctx.affinity().partition(key),
+                        key.valueBytes(cctx));
+
+                    unprocessedKeys.add(swapKey);
                 }
             }
 
@@ -812,8 +817,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         else {
             unprocessedKeys = new ArrayList<>(keys.size());
 
-            for (KeyCacheObject key : keys)
-                unprocessedKeys.add(new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key)));
+            for (KeyCacheObject key : keys) {
+                SwapKey swapKey = new SwapKey(key.value(cctx, false),
+                    cctx.affinity().partition(key),
+                    key.valueBytes(cctx));
+
+                unprocessedKeys.add(swapKey);
+            }
         }
 
         assert swapEnabled;
@@ -960,7 +970,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try offheap.
         if (offheapEnabled) {
-            byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx));
+            byte[] val = offheap.remove(spaceName, part, key.value(cctx, false), key.valueBytes(cctx));
 
             if (val != null) {
                 if (c != null)
@@ -970,9 +980,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             }
         }
 
-        if (swapEnabled)
-            swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), c,
+        if (swapEnabled) {
+            swapMgr.remove(spaceName,
+                new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)),
+                c,
                 cctx.deploy().globalLoader());
+        }
     }
 
     /**
@@ -1060,8 +1073,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         else {
             Map<SwapKey, byte[]> batch = new LinkedHashMap<>();
 
-            for (GridCacheBatchSwapEntry entry : swapped)
-                batch.put(new SwapKey(entry.key(), entry.partition(), entry.key().valueBytes(cctx)), entry.marshal());
+            for (GridCacheBatchSwapEntry entry : swapped) {
+                SwapKey swapKey = new SwapKey(entry.key().value(cctx, false),
+                    entry.partition(),
+                    entry.key().valueBytes(cctx));
+
+                batch.put(swapKey, entry.marshal());
+            }
 
             swapMgr.writeAll(spaceName, batch, cctx.deploy().globalLoader());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index ad146fe..8ab6dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -41,6 +41,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
      */
     public KeyCacheObjectImpl(Object val, byte[] valBytes) {
         assert val != null;
+        assert valBytes != null || this instanceof UserKeyCacheObjectImpl : this;
 
         this.val = val;
         this.valBytes = valBytes;
@@ -61,8 +62,8 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] valueBytes(GridCacheContext ctx) {
-        assert valBytes != null;
+    @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException {
+        assert valBytes != null : this;
 
         return valBytes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
index 5c635a6..0b7bc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 
 /**
  * Cache object wrapping key provided by user. Need to be copied before stored in cache.
@@ -38,19 +39,29 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException {
+        if (valBytes == null)
+            valBytes = CU.marshal(ctx.shared(), val);
+
+        return valBytes;
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheObject prepareForCache(GridCacheContext ctx) {
-        if (needCopy(ctx)) {
-            try {
-                if (valBytes == null)
-                    valBytes = ctx.marshaller().marshal(val);
+        try {
+            if (valBytes == null)
+                valBytes = ctx.marshaller().marshal(val);
 
-                return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()), valBytes);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to marshal object: " + val, e);
+            if (needCopy(ctx)) {
+                Object val = ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader());
+
+                return new KeyCacheObjectImpl(val, valBytes);
             }
-        }
-        else
+
             return new KeyCacheObjectImpl(val, valBytes);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to marshal object: " + val, e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 497af0a..8dbabc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -74,7 +74,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> txLockAsync(
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx tx,
         boolean isRead,
@@ -95,7 +95,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         IgniteTxLocalEx tx = ctx.tm().userTxx();
 
         // Return value flag is true because we choose to bring values for explicit locks.
-        return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter);
+        return lockAllAsync(ctx.cacheKeysView(keys),
+            timeout,
+            tx,
+            false,
+            false,
+            /*retval*/true,
+            null,
+            -1L,
+            filter);
     }
 
     /**
@@ -110,7 +118,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * @param filter Optional filter.
      * @return Future for locks.
      */
-    protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+    protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
         long timeout,
         @Nullable IgniteTxLocalEx tx,
         boolean isInvalidate,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 3dc9e45..5648ad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
@@ -87,9 +88,13 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     @GridDirectTransient
     private Map<IgniteTxKey, GridCacheVersion> dhtVers;
 
-    /** Serialized map. */
-    @GridToStringExclude
-    private byte[] dhtVersBytes;
+    /** */
+    @GridDirectCollection(IgniteTxKey.class)
+    private Collection<IgniteTxKey> dhtVerKeys;
+
+    /** */
+    @GridDirectCollection(IgniteTxKey.class)
+    private Collection<GridCacheVersion> dhtVerVals;
 
     /** Group lock key, if any. */
     @GridToStringInclude
@@ -317,8 +322,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         if (grpLockKey != null && grpLockKeyBytes == null)
             grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);
 
-        if (dhtVers != null && dhtVersBytes == null)
-            dhtVersBytes = ctx.marshaller().marshal(dhtVers);
+        if (dhtVers != null) {
+            for (IgniteTxKey key : dhtVers.keySet()) {
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
+                key.prepareMarshal(cctx);
+            }
+
+            dhtVerKeys = dhtVers.keySet();
+            dhtVerVals = dhtVers.values();
+        }
 
         if (txNodes != null)
             txNodesBytes = ctx.marshaller().marshal(txNodes);
@@ -349,8 +362,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         if (grpLockKeyBytes != null && grpLockKey == null)
             grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
 
-        if (dhtVersBytes != null && dhtVers == null)
-            dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr);
+        if (dhtVerKeys != null && dhtVers == null) {
+            assert dhtVerVals != null;
+            assert dhtVerKeys.size() == dhtVerVals.size();
+
+            Iterator<IgniteTxKey> keyIt = dhtVerKeys.iterator();
+            Iterator<GridCacheVersion> verIt = dhtVerVals.iterator();
+
+            dhtVers = U.newHashMap(dhtVerKeys.size());
+
+            while (keyIt.hasNext()) {
+                IgniteTxKey key = keyIt.next();
+
+                key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+
+                dhtVers.put(key, verIt.next());
+            }
+        }
 
         if (txNodesBytes != null)
             txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
@@ -436,84 +464,90 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("dhtVersBytes", dhtVersBytes))
+                if (!writer.writeCollection("dhtVerKeys", dhtVerKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes))
+                if (!writer.writeCollection("dhtVerVals", dhtVerVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("invalidate", invalidate))
+                if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+                if (!writer.writeBoolean("invalidate", invalidate))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
+                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeBoolean("partLock", partLock))
+                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("partLock", partLock))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeBoolean("sys", sys))
+                if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeLong("threadId", threadId))
+                if (!writer.writeBoolean("sys", sys))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeLong("threadId", threadId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
+                if (!writer.writeLong("timeout", timeout))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeInt("txSize", txSize))
+                if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("writeVer", writeVer))
+                if (!writer.writeInt("txSize", txSize))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
                 if (!writer.writeCollection("writesBytes", writesBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
@@ -548,7 +582,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 9:
-                dhtVersBytes = reader.readByteArray("dhtVersBytes");
+                dhtVerKeys = reader.readCollection("dhtVerKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -556,7 +590,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 10:
-                grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes");
+                dhtVerVals = reader.readCollection("dhtVerVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -564,7 +598,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 11:
-                invalidate = reader.readBoolean("invalidate");
+                grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -572,6 +606,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 12:
+                invalidate = reader.readBoolean("invalidate");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 byte isolationOrd;
 
                 isolationOrd = reader.readByte("isolation");
@@ -583,7 +625,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 13:
+            case 14:
                 onePhaseCommit = reader.readBoolean("onePhaseCommit");
 
                 if (!reader.isLastRead())
@@ -591,7 +633,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 14:
+            case 15:
                 partLock = reader.readBoolean("partLock");
 
                 if (!reader.isLastRead())
@@ -599,7 +641,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 readsBytes = reader.readCollection("readsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -607,7 +649,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 sys = reader.readBoolean("sys");
 
                 if (!reader.isLastRead())
@@ -615,7 +657,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 threadId = reader.readLong("threadId");
 
                 if (!reader.isLastRead())
@@ -623,7 +665,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 timeout = reader.readLong("timeout");
 
                 if (!reader.isLastRead())
@@ -631,7 +673,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 txNodesBytes = reader.readByteArray("txNodesBytes");
 
                 if (!reader.isLastRead())
@@ -639,7 +681,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 txSize = reader.readInt("txSize");
 
                 if (!reader.isLastRead())
@@ -647,7 +689,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -655,7 +697,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 writesBytes = reader.readCollection("writesBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -675,7 +717,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 24;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c1000ea..3ce5cd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -500,12 +500,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
                                         txEntry.cached().unswap(true, false);
 
-                                    GridTuple3<GridCacheOperation, CacheObject, byte[]> res =
+                                    IgniteBiTuple<GridCacheOperation, CacheObject> res =
                                         applyTransformClosures(txEntry, false);
 
                                     GridCacheOperation op = res.get1();
                                     CacheObject val = res.get2();
-                                    byte[] valBytes = res.get3();
 
                                     GridCacheVersion explicitVer = txEntry.conflictVersion();
 
@@ -556,7 +555,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
                                                 resolveTaskName());
                                         else {
-                                            cached.innerSet(this, eventNodeId(), nodeId, val, valBytes, false, false,
+                                            cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
                                                 txEntry.ttl(), true, true, topVer, txEntry.filters(),
                                                 replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
@@ -565,7 +564,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
                                                 CacheObject val0 = null;
-                                                byte[] valBytes0 = null;
 
                                                 GridCacheValueBytes valBytesTuple = cached.valueBytes();
 
@@ -579,8 +577,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 else
                                                     val0 = cached.rawGet();
 
-                                                nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
-                                                    cached.ttl(), nodeId);
+                                                nearCached.updateOrEvict(xidVer,
+                                                    val0,
+                                                    cached.expireTime(),
+                                                    cached.ttl(),
+                                                    nodeId);
                                             }
                                         }
                                     }
@@ -591,7 +592,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                                         // Keep near entry up to date.
                                         if (nearCached != null)
-                                            nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId);
+                                            nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId);
                                     }
                                     else if (op == RELOAD) {
                                         CacheObject reloaded = cached.innerReload();
@@ -599,7 +600,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         if (nearCached != null) {
                                             nearCached.innerReload();
 
-                                            nearCached.updateOrEvict(cached.version(), reloaded, null,
+                                            nearCached.updateOrEvict(cached.version(), reloaded,
                                                 cached.expireTime(), cached.ttl(), nodeId);
                                         }
                                     }
@@ -621,7 +622,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                                             if (nearCached != null) {
                                                 CacheObject val0 = null;
-                                                byte[] valBytes0 = null;
 
                                                 GridCacheValueBytes valBytesTuple = cached.valueBytes();
 
@@ -635,8 +635,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 else
                                                     val0 = cached.rawGet();
 
-                                                nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
-                                                    cached.ttl(), nodeId);
+                                                nearCached.updateOrEvict(xidVer,
+                                                    val0,
+                                                    cached.expireTime(),
+                                                    cached.ttl(),
+                                                    nodeId);
                                             }
                                         }
                                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 298e522..5febfcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -458,7 +458,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     entry = entryEx(key, false);
 
-                    entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
+                    entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
                         replicate ? DR_LOAD : DR_NONE);
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 09f1629..6e4eac8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -311,22 +311,18 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         if (isNew() || !valid(-1) || deletedUnlocked())
             return null;
         else {
-            CacheObject val0 = null;
-            byte[] valBytes0 = null;
-
-// TODO IGNITE-51.
-//            GridCacheValueBytes valBytesTuple = valueBytesUnlocked();
-//
-//            if (!valBytesTuple.isNull()) {
-//                if (valBytesTuple.isPlain())
-//                    val0 = (V)valBytesTuple.get();
-//                else
-//                    valBytes0 = valBytesTuple.get();
-//            }
-//            else
-//                val0 = val;
-
-            return F.t(ver, val0, valBytes0);
+            CacheObject val0 = val;
+
+            if (val0 == null && valPtr != 0) {
+                IgniteBiTuple<byte[], Boolean> t = valueBytes0();
+
+                if (t.get2())
+                    val0 = cctx.toCacheObject(t.get1(), null);
+                else
+                    val0 = cctx.toCacheObject(null, t.get1());
+            }
+
+            return F.t(ver, val0, null);
         }
     }
 
@@ -563,7 +559,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 clearIndex(prev);
 
                 // Give to GC.
-                update(null, null, 0L, 0L, ver);
+                update(null, 0L, 0L, ver);
 
                 if (swap) {
                     releaseSwap();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 516b2bb..e9674c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -357,14 +357,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         expiryPlc,
                         skipVals);
                 }
-// TODO IGNITE-51.
-//                else {
-//                    fut = tx.getAllAsync(cctx,
-//                        keys.keySet(),
-//                        null,
-//                        deserializePortable,
-//                        skipVals);
-//                }
+                else {
+                    fut = tx.getAllAsync(cctx,
+                        keys.keySet(),
+                        null,
+                        false,
+                        skipVals,
+                        true);
+                }
             }
         }
         else {
@@ -394,13 +394,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                                     expiryPlc, skipVals);
                             }
                             else {
-                                return null;
-// TODO IGNITE-51.
-//                                return tx.getAllAsync(cctx,
-//                                    keys.keySet(),
-//                                    null,
-//                                    false,
-//                                    skipVals);
+                                return tx.getAllAsync(cctx,
+                                    keys.keySet(),
+                                    null,
+                                    false,
+                                    skipVals,
+                                    true);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 126d641..056c3ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1075,7 +1075,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                     try {
                         GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
 
-                        if (entry.initialValue(info.value(), null, info.version(), info.ttl(),
+                        if (entry.initialValue(info.value(), info.version(), info.ttl(),
                             info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
                             if (rec && !entry.isInternal())
                                 cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 7c7e42c..449702b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -548,7 +548,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> lockAllAsync(
-        @Nullable Collection<? extends K> keys,
+        @Nullable Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx txx,
         boolean isInvalidate,
@@ -583,7 +583,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param filter Optional filter.
      * @return Lock future.
      */
-    public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<? extends K> keys,
+    public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx txx,
         boolean isInvalidate,
@@ -612,16 +612,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             accessTtl,
             filter);
 
-        for (K key : keys) {
-            if (key == null)
-                continue;
-
-            // TODO IGNITE-51.
-            KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
+        for (KeyCacheObject key : keys) {
             try {
                 while (true) {
-                    GridDhtCacheEntry entry = entryExx(cacheKey, tx.topologyVersion());
+                    GridDhtCacheEntry entry = entryExx(key, tx.topologyVersion());
 
                     try {
                         fut.addEntry(entry);


[4/6] incubator-ignite git commit: # ignite-51

Posted by sb...@apache.org.
# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea5bd46a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea5bd46a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea5bd46a

Branch: refs/heads/ignite-51
Commit: ea5bd46aebbe67357b2a20f58027d53a8323c17e
Parents: a040311 29508fa
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:07:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:25:32 2015 +0300

----------------------------------------------------------------------
 .../distributed/near/GridNearAtomicCache.java   |   5 +-
 .../distributed/near/GridNearCacheAdapter.java  |   5 +-
 .../distributed/near/GridNearCacheEntry.java    |  46 +++--
 .../distributed/near/GridNearGetFuture.java     |  27 +--
 .../near/GridNearTransactionalCache.java        |   7 +-
 .../communication/GridCacheMessageSelfTest.java | 172 +++++++++++++++++++
 6 files changed, 205 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 4ebcced,a30f372..2358013
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@@ -367,12 -369,12 +367,15 @@@ public class GridNearAtomicCache<K, V> 
          if (F.isEmpty(keys))
              return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
  
++        if (keyCheck)
++            validateCacheKeys(keys);
++
          GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
  
          subjId = ctx.subjectIdPerCall(subjId, prj);
  
          return loadAsync(null,
--            keys,
++            ctx.cacheKeysView(keys),
              false,
              forcePrimary,
              subjId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b8f8c6f,a32b6a8..e34019b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@@ -275,7 -275,7 +275,7 @@@ public abstract class GridNearCacheAdap
       * @return Loaded values.
       */
      public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx,
--        @Nullable Collection<? extends K> keys,
++        @Nullable Collection<KeyCacheObject> keys,
          boolean reload,
          boolean forcePrimary,
          @Nullable UUID subjId,
@@@ -287,9 -287,9 +287,6 @@@
          if (F.isEmpty(keys))
              return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
  
--        if (keyCheck)
--            validateCacheKeys(keys);
--
          IgniteTxLocalEx txx = (tx != null && tx.local()) ? (IgniteTxLocalEx)tx : null;
  
          final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2d859a0,2f6bebb..911f77b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@@ -273,22 -280,22 +273,18 @@@ public class GridNearCacheEntry extend
          if (dhtVer == null)
              return null;
          else {
--            CacheObject val0 = null;
--            byte[] valBytes0 = null;
- 
- // TODO IGNITE-51.
- //            GridCacheValueBytes valBytesTuple = valueBytes();
- //
- //            if (!valBytesTuple.isNull()) {
- //                if (valBytesTuple.isPlain())
- //                    val0 = (V)valBytesTuple.get();
- //                else
- //                    valBytes0 = valBytesTuple.get();
- //            }
- //            else
- //                val0 = val;
- 
-             return F.t(dhtVer, val0, valBytes0);
++            CacheObject val0 = val;
+ 
 -// TODO IGNITE-51.
 -//            GridCacheValueBytes valBytesTuple = valueBytes();
 -//
 -//            if (!valBytesTuple.isNull()) {
 -//                if (valBytesTuple.isPlain())
 -//                    val0 = (V)valBytesTuple.get();
 -//                else
 -//                    valBytes0 = valBytesTuple.get();
 -//            }
 -//            else
 -//                val0 = val;
 -
 -            return F.t(dhtVer, val0, valBytes0);
++            if (val0 == null && valPtr != 0) {
++                IgniteBiTuple<byte[], Boolean> t = valueBytes0();
++
++                if (t.get2())
++                    val0 = cctx.toCacheObject(t.get1(), null);
++                else
++                    val0 = cctx.toCacheObject(null, t.get1());
++            }
++
++            return F.t(ver, val0, null);
          }
      }
  
@@@ -320,17 -327,17 +316,15 @@@
      /** {@inheritDoc} */
      @Override protected Object readThrough(IgniteInternalTx tx, KeyCacheObject key, boolean reload,
          UUID subjId, String taskName) throws IgniteCheckedException {
--        return null;
--// TODO IGNTIE-51.
--//        return cctx.near().loadAsync(tx,
--//            F.asList(key),
--//            reload,
--//            /*force primary*/false,
--//            subjId,
--//            taskName,
--//            true,
--//            null,
--//            false).get().get(key);
++        return cctx.near().loadAsync(tx,
++            F.asList(key),
++            reload,
++            /*force primary*/false,
++            subjId,
++            taskName,
++            true,
++            null,
++            false).get().get(key.value(cctx, false));
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ef76366,ef76366..31481f2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -63,7 -63,7 +63,7 @@@ public final class GridNearGetFuture<K
      private GridCacheContext<K, V> cctx;
  
      /** Keys. */
--    private Collection<? extends K> keys;
++    private Collection<KeyCacheObject> keys;
  
      /** Reload flag. */
      private boolean reload;
@@@ -130,7 -130,7 +130,7 @@@
       */
      public GridNearGetFuture(
          GridCacheContext<K, V> cctx,
--        Collection<? extends K> keys,
++        Collection<KeyCacheObject> keys,
          boolean readThrough,
          boolean reload,
          boolean forcePrimary,
@@@ -170,21 -170,21 +170,7 @@@
      public void init() {
          long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
  
--        Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
--            @Override public KeyCacheObject apply(K key) {
--                if (key == null) {
--                    NullPointerException err = new NullPointerException("Null key.");
--
--                    onDone(err);
--
--                    throw err;
--                }
--
--                return cctx.toCacheKeyObject(key);
--            }
--        });
--
--        map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
++        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
  
          markInitialized();
      }
@@@ -199,13 -199,13 +185,6 @@@
          // Should not flip trackable flag from true to false since get future can be remapped.
      }
  
--    /**
--     * @return Keys.
--     */
--    Collection<? extends K> keys() {
--        return keys;
--    }
--
      /** {@inheritDoc} */
      @Override public IgniteUuid futureId() {
          return futId;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea5bd46a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e35e0fd,6e88c4f..7c1243e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -111,6 -111,6 +111,9 @@@ public class GridNearTransactionalCache
          if (F.isEmpty(keys))
              return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
  
++        if (keyCheck)
++            validateCacheKeys(keys);
++
          IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
  
          if (tx != null && !tx.implicit() && !skipTx) {
@@@ -131,7 -126,7 +134,7 @@@
          subjId = ctx.subjectIdPerCall(subjId, prj);
  
          return loadAsync(null,
--            keys,
++            ctx.cacheKeysView(keys),
              false,
              forcePrimary,
              subjId,
@@@ -150,7 -145,7 +153,7 @@@
       * @return Future.
       */
      IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
--        @Nullable Collection<? extends K> keys,
++        @Nullable Collection<KeyCacheObject> keys,
          boolean readThrough,
          boolean deserializePortable,
          @Nullable IgniteCacheExpiryPolicy expiryPlc,


[2/6] incubator-ignite git commit: # ignite-51

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 45b13b7..d2b7d36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -464,24 +464,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
                 entry.cached(cached, null);
 
-// TODO IGNITE-51.
-//                while (true) {
-//                    GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
-//
-//                    try {
-//                        // Set key bytes to avoid serializing in future.
-//                        cached.keyBytes(entry.keyBytes());
-//
-//                        entry.cached(cached, null);
-//
-//                        break;
-//                    }
-//                    catch (GridCacheEntryRemovedException ignore) {
-//                        if (log.isDebugEnabled())
-//                            log.debug("Got removed entry when adding to dht tx (will retry): " + cached);
-//                    }
-//                }
-
                 GridCacheVersion explicit = entry.explicitVersion();
 
                 if (explicit != null) {
@@ -641,17 +623,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
         GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
 
-        IgniteInternalFuture<Boolean> fut = null;
-// TODO IGNTIE-51
-//        IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-//            lockTimeout(),
-//            this,
-//            isInvalidate(),
-//            read,
-//            /*retval*/false,
-//            isolation,
-//            accessTtl,
-//            CU.empty());
+        IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
+            lockTimeout(),
+            this,
+            isInvalidate(),
+            read,
+            /*retval*/false,
+            isolation,
+            accessTtl,
+            (IgnitePredicate[])CU.empty());
 
         return new GridEmbeddedFuture<>(
             fut,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 369935b..7f9022a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -340,7 +340,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                             }
 
                             if (err != null || procRes != null)
-                                ret.addEntryProcessResult(key,
+                                ret.addEntryProcessResult(key.value(cacheCtx, false),
                                     err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err));
                             else
                                 ret.invokeResult(true);
@@ -1240,7 +1240,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                         GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
 
                         try {
-                            if (entry.initialValue(info.value(), null, info.version(),
+                            if (entry.initialValue(info.value(), info.version(),
                                 info.ttl(), info.expireTime(), true, topVer, drType)) {
                                 if (rec && !entry.isInternal())
                                     cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 2423fe1..ec45af1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -328,73 +328,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         }
 
         switch (writer.state()) {
-            case 23:
+            case 24:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 25:
                 if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 26:
                 if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 27:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 28:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 29:
                 if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 30:
                 if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 31:
                 if (!writer.writeByteArray("ownedBytes", ownedBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 32:
                 if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 33:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 33:
+            case 34:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 34:
+            case 35:
                 if (!writer.writeLong("topVer", topVer))
                     return false;
 
@@ -416,7 +416,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             return false;
 
         switch (reader.state()) {
-            case 23:
+            case 24:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -424,7 +424,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
 
                 if (!reader.isLastRead())
@@ -432,7 +432,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
@@ -440,7 +440,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 27:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -448,7 +448,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 28:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -456,7 +456,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 29:
                 nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
@@ -464,7 +464,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 30:
                 nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
@@ -472,7 +472,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 31:
                 ownedBytes = reader.readByteArray("ownedBytes");
 
                 if (!reader.isLastRead())
@@ -480,7 +480,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 32:
                 preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
@@ -488,7 +488,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 33:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -496,7 +496,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 33:
+            case 34:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -504,7 +504,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 34:
+            case 35:
                 topVer = reader.readLong("topVer");
 
                 if (!reader.isLastRead())
@@ -524,6 +524,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 35;
+        return 36;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 96777c9..603141b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -60,7 +60,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     private GridCacheContext<K, V> cctx;
 
     /** Keys. */
-    private Collection<? extends K> keys;
+    private Collection<KeyCacheObject> keys;
 
     /** Topology version. */
     private long topVer;
@@ -127,7 +127,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         long topVer,
         boolean readThrough,
         boolean reload,
@@ -167,21 +167,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     public void init() {
         long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
 
-        Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() {
-            @Override public KeyCacheObject apply(K key) {
-                if (key == null) {
-                    NullPointerException err = new NullPointerException("Null key.");
-
-                    onDone(err);
-
-                    throw err;
-                }
-
-                return cctx.toCacheKeyObject(key);
-            }
-        });
-
-        map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
         markInitialized();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5e38db3..adcaebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -276,6 +276,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean deserializePortable,
         final boolean skipVals
     ) {
+        ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+        if (F.isEmpty(keys))
+            return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
+
+        if (keyCheck)
+            validateCacheKeys(keys);
+
         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
         subjId = ctx.subjectIdPerCall(null, prj);
@@ -286,7 +294,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
             @Override public IgniteInternalFuture<Map<K, V>> apply() {
-                return getAllAsync0(keys,
+                return getAllAsync0(ctx.cacheKeysView(keys),
                     false,
                     forcePrimary,
                     subjId0,
@@ -624,7 +632,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+    @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
         long timeout,
         @Nullable IgniteTxLocalEx tx,
         boolean isInvalidate,
@@ -895,7 +903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param skipVals Skip values flag.
      * @return Get future.
      */
-    private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
+    private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys,
         boolean reload,
         boolean forcePrimary,
         UUID subjId,
@@ -903,14 +911,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean deserializePortable,
         @Nullable ExpiryPolicy expiryPlc,
         boolean skipVals) {
-        ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
-        if (F.isEmpty(keys))
-            return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
-
-        if (keyCheck)
-            validateCacheKeys(keys);
-
         long topVer = ctx.affinity().affinityTopologyVersion();
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
@@ -922,17 +922,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             boolean success = true;
 
             // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (K key : keys) {
-                if (key == null)
-                    throw new NullPointerException("Null key.");
-
-                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
+            for (KeyCacheObject key : keys) {
                 GridCacheEntryEx entry = null;
 
                 while (true) {
                     try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
 
                         // If our DHT cache do has value, then we peek it.
                         if (entry != null) {
@@ -956,12 +951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 GridCacheVersion obsoleteVer = context().versions().next();
 
                                 if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeIfObsolete(cacheKey);
+                                    removeIfObsolete(key);
 
                                 success = false;
                             }
                             else
-                                ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, false);
+                                ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, false);
                         }
                         else
                             success = false;
@@ -998,7 +993,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (success) {
                 sendTtlUpdateRequest(expiry);
 
-                return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+                return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
             }
         }
 
@@ -1020,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         fut.init();
 
-        return ctx.wrapCloneMap(fut);
+        return fut;
     }
 
     /**
@@ -1700,7 +1695,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     locNodeId,
                     op,
                     writeVal,
-                    null,
                     req.invokeArguments(),
                     primary && writeThrough(),
                     req.returnValue(),
@@ -1965,7 +1959,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         op,
                         writeVal,
                         null,
-                        null,
                         false,
                         false,
                         expiry,
@@ -2460,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             nodeId,
                             op,
                             op == TRANSFORM ? entryProcessor : val,
-                            null,
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*retval*/false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e2b974c..a9a26c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
 
         GridCacheReturn ret = (GridCacheReturn)res;
 
-        if (op != TRANSFORM) {
+        if (op != TRANSFORM && ret != null) {
             CacheObject val = (CacheObject)ret.value();
 
             ret.value(CU.value(val, cctx, false));
@@ -357,7 +357,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         if (res.remapKeys() != null) {
             assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
 
-            // TODO IGNITE-51.
             mapOnTopology(res.remapKeys(), true, nodeId);
 
             return;
@@ -809,7 +808,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
-                    onDone(new GridCacheReturn<Object>(null, true));
+                    onDone(new GridCacheReturn(null, true));
             }
             catch (IgniteCheckedException e) {
                 onDone(addFailedKeys(req.keys(), e));
@@ -928,8 +927,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         if (err0 == null)
             err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-        // TODO IGNITE-51.
-        err0.add(failedKeys, err);
+        List<Object> keys = new ArrayList<>(failedKeys.size());
+
+        for (KeyCacheObject key : failedKeys)
+            keys.add(key.value(cctx, false));
+
+        err0.add(keys, err);
 
         return err0;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 84b32ce..a22b31c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -183,12 +183,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
+        if (keyCheck)
+            validateCacheKeys(keys);
+
         IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx();
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
                 @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
-                    return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+                    return tx.getAllAsync(ctx,
+                        ctx.cacheKeysView(keys),
+                        entry,
+                        deserializePortable,
+                        skipVals,
+                        false);
                 }
             });
         }
@@ -200,7 +208,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         subjId = ctx.subjectIdPerCall(subjId, prj);
 
         return loadAsync(
-            keys,
+            ctx.cacheKeysView(keys),
             true,
             false,
             forcePrimary,
@@ -232,9 +240,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param taskName Task name.
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
+    public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean reload,
         boolean forcePrimary,
@@ -248,9 +257,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
-        if (keyCheck)
-            validateCacheKeys(keys);
-
         if (expiryPlc == null)
             expiryPlc = expiryPolicy(null);
 
@@ -261,17 +267,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             boolean success = true;
 
             // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (K key : keys) {
-                if (key == null)
-                    throw new NullPointerException("Null key.");
-
+            for (KeyCacheObject key : keys) {
                 GridCacheEntryEx entry = null;
 
-                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
                 while (true) {
                     try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey);
+                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
 
                         // If our DHT cache do has value, then we peek it.
                         if (entry != null) {
@@ -295,12 +296,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 GridCacheVersion obsoleteVer = context().versions().next();
 
                                 if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeIfObsolete(cacheKey);
+                                    removeIfObsolete(key);
 
                                 success = false;
                             }
                             else
-                                ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable, true);
+                                ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true);
                         }
                         else
                             success = false;
@@ -337,7 +338,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (success) {
                 sendTtlUpdateRequest(expiryPlc);
 
-                return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+                return new GridFinishedFuture<>(ctx.kernalContext(), locVals);
             }
         }
 
@@ -360,7 +361,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         fut.init();
 
-        return ctx.wrapCloneMap(fut);
+        return fut;
     }
 
     /**
@@ -369,7 +370,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * {@inheritDoc}
      */
     @Override public IgniteInternalFuture<Boolean> lockAllAsync(
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         long timeout,
         @Nullable IgniteTxLocalEx tx,
         boolean isInvalidate,
@@ -426,7 +427,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             Collection<KeyCacheObject> locKeys = new ArrayList<>();
 
             for (K key : keys) {
-                // TODO IGNITE-51.
                 KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
                 GridDistributedCacheEntry entry = peekExx(cacheKey);
@@ -519,7 +519,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param ver Lock version.
      * @param keys Keys.
      */
-    public void removeLocks(long threadId, GridCacheVersion ver, Collection<? extends K> keys) {
+    public void removeLocks(long threadId, GridCacheVersion ver, Collection<KeyCacheObject> keys) {
         if (keys.isEmpty())
             return;
 
@@ -530,11 +530,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
             Collection<KeyCacheObject> locKeys = new LinkedList<>();
 
-            for (K key : keys) {
-                // TODO IGNITE-51.
-                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
-                GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, cacheKey, ver);
+            for (KeyCacheObject key : keys) {
+                GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, key, ver);
 
                 if (lock != null) {
                     long topVer = lock.topologyVersion();
@@ -559,14 +556,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                             req.version(ver);
                         }
 
-                        GridCacheEntryEx entry = peekEx(cacheKey);
+                        GridCacheEntryEx entry = peekEx(key);
 
-                        KeyCacheObject key0 = entry != null ? entry.key() : cacheKey;
+                        KeyCacheObject key0 = entry != null ? entry.key() : key;
 
                         req.addKey(key0, ctx);
                     }
                     else
-                        locKeys.add(cacheKey);
+                        locKeys.add(key);
                 }
             }
 
@@ -616,7 +613,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         final long threadId,
         final GridCacheVersion ver,
         final long topVer,
-        final Collection<K> keys,
+        final Collection<KeyCacheObject> keys,
         final boolean txRead,
         final long timeout,
         final long accessTtl,
@@ -624,9 +621,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        // TODO IGNITE-51.
-        // IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
-        IgniteInternalFuture<Object> keyFut = null;
+        IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
 
         // Prevent embedded future creation if possible.
         if (keyFut.isDone()) {
@@ -691,7 +686,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         long threadId,
         final GridCacheVersion ver,
         final long topVer,
-        final Collection<K> keys,
+        final Collection<KeyCacheObject> keys,
         final boolean txRead,
         final long timeout,
         final long accessTtl,
@@ -717,15 +712,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
             boolean timedout = false;
 
-            for (K key : keys) {
+            for (KeyCacheObject key : keys) {
                 if (timedout)
                     break;
 
-                // TODO IGNITE-51.
-                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
-
                 while (true) {
-                    GridDhtCacheEntry entry = entryExx(cacheKey, topVer);
+                    GridDhtCacheEntry entry = entryExx(key, topVer);
 
                     try {
                         fut.addEntry(key == null ? null : entry);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7d007a1..29e9730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     private long threadId;
 
     /** Keys to lock. */
-    private Collection<? extends K> keys;
+    private Collection<KeyCacheObject> keys;
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -133,7 +133,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      */
     public GridDhtColocatedLockFuture(
         GridCacheContext<K, V> cctx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,
         boolean retval,
@@ -535,8 +535,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             // Continue mapping on the same topology version as it was before.
             topSnapshot.compareAndSet(null, snapshot);
 
-            // TODO IGNITE-51.
-            // map(keys);
+            map(keys);
 
             markInitialized();
 
@@ -569,8 +568,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                     topSnapshot.compareAndSet(null, snapshot);
 
-                    // TODO IGNITE-51.
-                    // map(keys);
+                    map(keys);
 
                     markInitialized();
                 }
@@ -881,9 +879,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             threadId,
             lockVer,
             topVer,
-            // TODO IGNITE-51.
-            // keys,
-            null,
+            keys,
             read,
             timeout,
             accessTtl,
@@ -1244,7 +1240,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                             }
 
                             // Set value to detached entry.
-                            entry.resetFromPrimary(newVal, null, dhtVer);
+                            entry.resetFromPrimary(newVal, dhtVer);
 
                             if (log.isDebugEnabled())
                                 log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 944034c..b9602d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,16 +46,12 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
      * Sets value to detached entry so it can be retrieved in transactional gets.
      *
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ver Version.
      * @throws IgniteCheckedException If value unmarshalling failed.
      */
-    public void resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver)
+    public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
         throws IgniteCheckedException {
-       if (valBytes != null && val == null)
-            val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
-
-        value(val, valBytes);
+        value(val);
 
         this.ver = ver;
     }
@@ -66,7 +62,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) {
+    @Override protected void value(@Nullable CacheObject val) {
         this.val = val;
     }
 
@@ -79,7 +75,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+    @Override protected void updateIndex(CacheObject val, long expireTime,
         GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
         // No-op for detached entries, index is updated on primary nodes.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 66efe48..1338788 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -503,7 +503,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     try {
                         if (entry.initialValue(
                             info.value(),
-                            null,
                             info.version(),
                             info.ttl(),
                             info.expireTime(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 6ebd8d2..4e5beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -506,7 +506,6 @@ public class GridDhtPartitionDemandPool<K, V> {
                     if (preloadPred == null || preloadPred.apply(entry)) {
                         if (cached.initialValue(
                             entry.value(),
-                            null,
                             entry.version(),
                             entry.ttl(),
                             entry.expireTime(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a30f372..4ebcced 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -219,7 +219,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         nodeId,
                         op,
                         val,
-                        valBytes,
                         null,
                         /*write-through*/false,
                         /*retval*/false,
@@ -316,7 +315,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             nodeId,
                             op,
                             op == TRANSFORM ? entryProcessor : val,
-                            null,
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*retval*/false,
@@ -650,7 +648,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+    @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
         long timeout,
         @Nullable IgniteTxLocalEx tx,
         boolean isInvalidate,
@@ -659,7 +657,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         @Nullable TransactionIsolation isolation,
         long accessTtl,
         IgnitePredicate<Cache.Entry<K, V>>[] filter) {
-        return dht.lockAllAsync(keys, timeout, filter);
+        return dht.lockAllAsync(null, timeout, filter);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index a32b6a8..b8f8c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -309,7 +309,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         // init() will register future for responses if future has remote mappings.
         fut.init();
 
-        return ctx.wrapCloneMap(fut);
+        return fut;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2f6bebb..2d859a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -138,7 +138,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
 
                             if (isNew() || !valid(topVer)) {
                                 // Version does not change for load ops.
-                                update(e.value(), null, e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+                                update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
 
                                 if (cctx.deferredDelete()) {
                                     boolean deleted = val == null;
@@ -176,7 +176,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * This method should be called only when lock is owned on this entry.
      *
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ver Version.
      * @param dhtVer DHT version.
      * @param primaryNodeId Primary node ID.
@@ -185,27 +184,23 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings( {"RedundantTypeArguments"})
-    public boolean resetFromPrimary(CacheObject val, byte[] valBytes, GridCacheVersion ver, GridCacheVersion dhtVer,
-        UUID primaryNodeId) throws GridCacheEntryRemovedException, IgniteCheckedException {
+    public boolean resetFromPrimary(CacheObject val,
+        GridCacheVersion ver,
+        GridCacheVersion dhtVer,
+        UUID primaryNodeId)
+        throws GridCacheEntryRemovedException, IgniteCheckedException
+    {
         assert dhtVer != null;
 
         cctx.versions().onReceived(primaryNodeId, dhtVer);
 
-// TODO IGNITE-51.
-//        if (valBytes != null && val == null && !cctx.config().isStoreValueBytes()) {
-//            GridCacheVersion curDhtVer = dhtVersion();
-//
-//            if (!F.eq(dhtVer, curDhtVer))
-//                val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader());
-//        }
-
         synchronized (this) {
             checkObsolete();
 
             this.primaryNodeId = primaryNodeId;
 
             if (!F.eq(this.dhtVer, dhtVer)) {
-                value(val, valBytes);
+                value(val);
 
                 this.ver = ver;
                 this.dhtVer = dhtVer;
@@ -222,14 +217,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      *
      * @param dhtVer DHT version.
      * @param val Value associated with version.
-     * @param valBytes Value bytes.
      * @param expireTime Expire time.
      * @param ttl Time to live.
      * @param primaryNodeId Primary node ID.
      */
     public void updateOrEvict(GridCacheVersion dhtVer,
         @Nullable CacheObject val,
-        @Nullable byte[] valBytes,
         long expireTime,
         long ttl,
         UUID primaryNodeId)
@@ -248,7 +241,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 // If cannot evict, then update.
                 if (this.dhtVer == null) {
                     if (!markObsolete(dhtVer)) {
-                        value(val, valBytes);
+                        value(val);
 
                         ttlAndExpireTimeExtras((int) ttl, expireTime);
 
@@ -396,7 +389,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
 
                     // Change entry only if dht version has changed.
                     if (!dhtVer.equals(dhtVersion())) {
-                        update(val, valBytes, expireTime, ttl, ver);
+                        update(val, expireTime, ttl, ver);
 
                         if (cctx.deferredDelete()) {
                             boolean deleted = val == null && valBytes == null;
@@ -429,7 +422,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void updateIndex(CacheObject val, byte[] valBytes, long expireTime,
+    @Override protected void updateIndex(CacheObject val, long expireTime,
         GridCacheVersion ver, CacheObject old) throws IgniteCheckedException {
         // No-op: queries are disabled for near cache.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index b880966..d89b59d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1029,7 +1029,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                                         // Lock is held at this point, so we can set the
                                         // returned value if any.
-                                        entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+                                        entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
 
                                         entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
                                             res.rolledbackVersions(), res.pending());
@@ -1387,7 +1387,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                             // Lock is held at this point, so we can set the
                             // returned value if any.
-                            entry.resetFromPrimary(newVal, null, lockVer, dhtVer, node.id());
+                            entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
 
                             if (inTx() && implicitTx() && tx.onePhaseCommit()) {
                                 boolean pass = res.filterResult(i);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 6e88c4f..e35e0fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -116,7 +116,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
                 @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
-                    return ctx.wrapCloneMap(tx.<K, V>getAllAsync(ctx, keys, entry, deserializePortable, skipVals));
+                    return tx.getAllAsync(ctx,
+                        ctx.cacheKeysView(keys),
+                        entry,
+                        deserializePortable,
+                        skipVals,
+                        false);
                 }
             });
         }
@@ -400,7 +405,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
 
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<Boolean> lockAllAsync(
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx tx,
         boolean isInvalidate,
@@ -411,9 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         IgnitePredicate<Cache.Entry<K, V>>[] filter
     ) {
         GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
-            // TODO IGNITE-51
-            // keys,
-            null,
+            keys,
             (GridNearTxLocal)tx,
             isRead,
             retval,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index d899562..8d6800d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -280,7 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> loadMissing(
-        GridCacheContext cacheCtx,
+        final GridCacheContext cacheCtx,
         boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
@@ -288,69 +288,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         boolean skipVals,
         final IgniteBiInClosure<KeyCacheObject, Object> c
     ) {
-        return null;
-// TODO IGNITE-51.
-//        if (cacheCtx.isNear()) {
-//            return cacheCtx.nearTx().txLoadAsync(this,
-//                keys,
-//                readThrough,
-//                deserializePortable,
-//                accessPolicy(cacheCtx, keys),
-//                skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-//                @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-//                    try {
-//                        Map<K, V> map = f.get();
-//
-//                        // Must loop through keys, not map entries,
-//                        // as map entries may not have all the keys.
-//                        for (K key : keys)
-//                            c.apply(key, map.get(key));
-//
-//                        return true;
-//                    }
-//                    catch (Exception e) {
-//                        setRollbackOnly();
-//
-//                        throw new GridClosureException(e);
-//                    }
-//                }
-//            });
-//        }
-//        else if (cacheCtx.isColocated()) {
-//            return cacheCtx.colocated().loadAsync(keys,
-//                readThrough,
-//                /*reload*/false,
-//                /*force primary*/false,
-//                topologyVersion(),
-//                CU.subjectId(this, cctx),
-//                resolveTaskName(),
-//                deserializePortable,
-//                accessPolicy(cacheCtx, keys),
-//                skipVals).chain(new C1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-//                    @Override public Boolean apply(IgniteInternalFuture<Map<K, V>> f) {
-//                        try {
-//                            Map<K, V> map = f.get();
-//
-//                            // Must loop through keys, not map entries,
-//                            // as map entries may not have all the keys.
-//                            for (K key : keys)
-//                                c.apply(key, map.get(key));
-//
-//                            return true;
-//                        }
-//                        catch (Exception e) {
-//                            setRollbackOnly();
-//
-//                            throw new GridClosureException(e);
-//                        }
-//                    }
-//                });
-//        }
-//        else {
-//            assert cacheCtx.isLocal();
-//
-//            return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
-//        }
+        if (cacheCtx.isNear()) {
+            return cacheCtx.nearTx().txLoadAsync(this,
+                keys,
+                readThrough,
+                deserializePortable,
+                accessPolicy(cacheCtx, keys),
+                skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+                @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+                    try {
+                        Map<Object, Object> map = f.get();
+
+                        // Must loop through keys, not map entries,
+                        // as map entries may not have all the keys.
+                        for (KeyCacheObject key : keys)
+                            c.apply(key, map.get(key.value(cacheCtx, false)));
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        setRollbackOnly();
+
+                        throw new GridClosureException(e);
+                    }
+                }
+            });
+        }
+        else if (cacheCtx.isColocated()) {
+            return cacheCtx.colocated().loadAsync(keys,
+                readThrough,
+                /*reload*/false,
+                /*force primary*/false,
+                topologyVersion(),
+                CU.subjectId(this, cctx),
+                resolveTaskName(),
+                deserializePortable,
+                accessPolicy(cacheCtx, keys),
+                skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+                    @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+                        try {
+                            Map<Object, Object> map = f.get();
+
+                            // Must loop through keys, not map entries,
+                            // as map entries may not have all the keys.
+                            for (KeyCacheObject key : keys)
+                                c.apply(key, map.get(key.value(cacheCtx, false)));
+
+                            return true;
+                        }
+                        catch (Exception e) {
+                            setRollbackOnly();
+
+                            throw new GridClosureException(e);
+                        }
+                    }
+                });
+        }
+        else {
+            assert cacheCtx.isLocal();
+
+            return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 9af91c6..491a171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -949,7 +949,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                                     IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
 
-                                    nearEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion(),
+                                    nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(),
                                         tup.get1(), m.node().id());
                                 }
                                 else if (txEntry.cached().detached()) {
@@ -957,7 +957,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                                     IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
 
-                                    detachedEntry.resetFromPrimary(tup.get2(), null, tx.xidVersion());
+                                    detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion());
                                 }
 
                                 break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 42e8600..e4111ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -257,61 +257,61 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         }
 
         switch (writer.state()) {
-            case 23:
+            case 24:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 25:
                 if (!writer.writeBoolean("implicitSingle", implicitSingle))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 26:
                 if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 27:
                 if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 28:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 29:
                 if (!writer.writeBoolean("near", near))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 30:
                 if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 31:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 32:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 33:
                 if (!writer.writeLong("topVer", topVer))
                     return false;
 
@@ -333,7 +333,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
             return false;
 
         switch (reader.state()) {
-            case 23:
+            case 24:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -341,7 +341,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 implicitSingle = reader.readBoolean("implicitSingle");
 
                 if (!reader.isLastRead())
@@ -349,7 +349,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
@@ -357,7 +357,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 27:
                 lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
@@ -365,7 +365,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 28:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -373,7 +373,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 29:
                 near = reader.readBoolean("near");
 
                 if (!reader.isLastRead())
@@ -381,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 30:
                 retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
@@ -389,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 31:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -397,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 32:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -405,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 33:
                 topVer = reader.readLong("topVer");
 
                 if (!reader.isLastRead())
@@ -425,7 +425,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 33;
+        return 34;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 273a184..78cfb73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -224,14 +224,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
         if (ownedVals != null && ownedValsBytes == null) {
             ownedValsBytes = new ArrayList<>(ownedVals.size());
 
             for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> entry : ownedVals.entrySet()) {
                 IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
 
+                GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId());
+
+                entry.getKey().prepareMarshal(cctx);
+
                 CacheObject val = tup.get2();
 
                 if (val != null)
@@ -245,8 +247,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
             retValBytes = ctx.marshaller().marshal(retVal);
 
         if (filterFailedKeys != null) {
-            for (IgniteTxKey key :filterFailedKeys)
+            for (IgniteTxKey key :filterFailedKeys) {
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
                 key.prepareMarshal(cctx);
+            }
         }
     }
 
@@ -254,8 +259,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
         if (ownedValsBytes != null && ownedVals == null) {
             ownedVals = new HashMap<>();
 
@@ -264,6 +267,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
                 CacheObject val = tup.get3();
 
+                GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId());
+
+                tup.get1().finishUnmarshal(cctx, ldr);
+
                 if (val != null)
                     val.finishUnmarshal(cctx, ldr);
 
@@ -275,8 +282,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
             retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
 
         if (filterFailedKeys != null) {
-            for (IgniteTxKey key :filterFailedKeys)
+            for (IgniteTxKey key :filterFailedKeys) {
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
                 key.finishUnmarshal(cctx, ldr);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 7a0c5d8..4c59437 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -104,7 +104,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+    @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx tx,
         boolean isRead,
@@ -121,7 +121,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         IgnitePredicate<Cache.Entry<K, V>>[] filter) {
         IgniteTxLocalEx tx = ctx.tm().localTx();
 
-        return lockAllAsync(keys, timeout, tx, filter);
+        return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter);
     }
 
     /**
@@ -131,7 +131,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
      * @param filter Filter.
      * @return Future.
      */
-    public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+    public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
         long timeout,
         @Nullable IgniteTxLocalEx tx,
         IgnitePredicate<Cache.Entry<K, V>>[] filter) {
@@ -141,12 +141,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
 
         try {
-            for (K key : keys) {
+            for (KeyCacheObject key : keys) {
                 while (true) {
                     GridLocalCacheEntry entry = null;
 
                     try {
-                        entry = entryExx(ctx.toCacheKeyObject(key));
+                        entry = entryExx(key);
 
                         if (!ctx.isAll(entry, filter)) {
                             fut.onFailed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index b2797a4..4e769e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -107,7 +107,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
      */
     GridLocalLockFuture(
         GridCacheContext<K, V> cctx,
-        Collection<? extends K> keys,
+        Collection<KeyCacheObject> keys,
         IgniteTxLocalEx tx,
         GridLocalCache<K, V> cache,
         long timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 833f8c9..8aa6a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1014,7 +1014,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                 try {
                     entry = entryEx(cacheKey);
 
-                    GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+                    GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
                         ver,
                         val == null ? DELETE : op,
                         val,
@@ -1452,7 +1452,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
                 assert writeVal != null || op == DELETE : "null write value found.";
 
-                GridTuple3<Boolean, CacheObject, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+                GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
                     ver,
                     op,
                     writeVal,
@@ -1468,12 +1468,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                     taskName);
 
                 if (intercept) {
-                    if (op == UPDATE)
+                    if (op == UPDATE) {
                         ctx.config().getInterceptor().onAfterPut(entry.key().value(ctx, false),
-                                writeVal.value(ctx, false));
+                            writeVal.value(ctx, false));
+                    }
                     else
-                        ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false),
-                                CU.value(t.get2(), ctx, false));
+                        ctx.config().getInterceptor().onAfterRemove(entry.key().value(ctx, false), t.get2());
                 }
             }
             catch (GridCacheEntryRemovedException ignore) {
@@ -1555,7 +1555,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+    @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
         long timeout,
         IgniteTxLocalEx tx,
         boolean isRead,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index c26ca4b..563d38c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1193,7 +1193,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
      * @throws IgniteCheckedException If failed to get previous value for transform.
      * @throws GridCacheEntryRemovedException If entry was concurrently deleted.
      */
-    protected GridTuple3<GridCacheOperation, CacheObject, byte[]> applyTransformClosures(
+    protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
         IgniteTxEntry txEntry,
         boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
         GridCacheContext cacheCtx = txEntry.context();
@@ -1201,10 +1201,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         assert cacheCtx != null;
 
         if (isSystemInvalidate())
-            return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null, null);
+            return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
 
         if (F.isEmpty(txEntry.entryProcessors()))
-            return F.t(txEntry.op(), txEntry.value(), null);
+            return F.t(txEntry.op(), txEntry.value());
         else {
             try {
                 boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
@@ -1246,14 +1246,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
                     modified |= invokeEntry.modified();
                 }
 
-                if (modified) {
-                    cacheVal = cacheCtx.toCacheObject(val);
-// TODO IGNITE-51
-//                    val = (V)cacheCtx.<V>unwrapTemporary(val);
-//
-//                    if (cacheCtx.portableEnabled())
-//                        val = (V)cacheCtx.marshalToPortable(val);
-                }
+                if (modified)
+                    cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
 
                 GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
 
@@ -1270,7 +1264,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
                     }
                 }
 
-                return F.t(op, cacheVal, null);
+                return F.t(op, cacheVal);
             }
             catch (GridCacheFilterFailedException e) {
                 assert false : "Empty filter failed for innerGet: " + e;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6b1881d..66eea11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -1044,7 +1044,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
         public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException {
             hasWriteVal = in.readBoolean();
 
-            val = (CacheObject)in.readObject();
+            if (hasWriteVal)
+                val = (CacheObject)in.readObject();
 
             op = fromOrdinal(in.readInt());
 // TODO IGNITE-51.


[6/6] incubator-ignite git commit: # ignite-51 minor

Posted by sb...@apache.org.
# ignite-51 minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7f9a6630
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7f9a6630
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7f9a6630

Branch: refs/heads/ignite-51
Commit: 7f9a6630b9480c580b4312a44e20eeac866fe4a0
Parents: 810be9a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:29:23 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:29:23 2015 +0300

----------------------------------------------------------------------
 .../spi/communication/GridCacheMessageSelfTest.java     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7f9a6630/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index c011bbc..226e35b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -101,16 +101,16 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
             }
         });
 
-        TestMessage message = new TestMessage();
+        TestMessage msg = new TestMessage();
 
         for (int i = 1; i <= SAMPLE_CNT; i++) {
-            mgr0.send(grid(1).localNode(), topic, message, GridIoPolicy.PUBLIC_POOL);
+            mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
 
             TestMessage1 mes1 = new TestMessage1();
 
             mes1.init(new GridTestMessage(grid(1).localNode().id(), i, 0));
 
-            message.add(mes1);
+            msg.add(mes1);
         }
 
         assert latch.await(3, SECONDS);
@@ -180,7 +180,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     *
+     */
     private static class TestMessage1 extends GridCacheMessage {
+        /** */
         GridTestMessage mes;
 
         public void init(GridTestMessage mes) {
@@ -189,7 +193,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public byte directType() {
-            return 0;
+            return DIRECT_TYPE1;
         }
 
         /** {@inheritDoc} */


[5/6] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-51' into ignite-51

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-51' into ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/810be9a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/810be9a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/810be9a2

Branch: refs/heads/ignite-51
Commit: 810be9a2fc62f0cc7837ed3d7b8a55b71fc66981
Parents: ea5bd46 2f43d32
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 16:25:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 2 16:25:49 2015 +0300

----------------------------------------------------------------------
 .../communication/GridCacheMessageSelfTest.java | 88 ++++++++++++++++++--
 1 file changed, 79 insertions(+), 9 deletions(-)
----------------------------------------------------------------------