You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 22:22:36 UTC

[6/6] incubator-ignite git commit: # ignite-51

# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea39d669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea39d669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea39d669

Branch: refs/heads/ignite-51
Commit: ea39d669b0964b0d420c4e21f0f862667c88c41d
Parents: 6445389
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 18:41:15 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 3 00:21:43 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   2 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheAdapter.java      |  24 +-
 .../processors/cache/GridCacheEntryEx.java      |  21 +-
 .../processors/cache/GridCacheEntryInfo.java    |   6 +-
 .../processors/cache/GridCacheMapEntry.java     | 161 ++---------
 .../processors/cache/GridCacheSwapManager.java  |   4 +-
 .../distributed/GridCacheTtlUpdateRequest.java  |   4 +-
 .../GridDistributedTxRemoteAdapter.java         |  40 +--
 .../distributed/dht/GridDhtCacheAdapter.java    |  12 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  11 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   6 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../distributed/dht/GridDhtLockRequest.java     |   2 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  10 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  16 +-
 .../dht/GridDhtTxPrepareResponse.java           |  24 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  26 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   2 +-
 .../colocated/GridDhtDetachedCacheEntry.java    |   7 +-
 .../distributed/near/GridNearCacheEntry.java    |  11 +-
 .../distributed/near/GridNearLockFuture.java    |   2 +-
 .../cache/distributed/near/GridNearTxLocal.java |   6 +-
 .../near/GridNearTxPrepareFuture.java           |   8 +-
 .../distributed/near/GridNearTxRemote.java      |   2 +-
 .../cache/local/GridLocalTxFuture.java          |   4 +-
 .../continuous/CacheContinuousQueryEntry.java   | 272 +++++++++++--------
 .../continuous/CacheContinuousQueryEvent.java   |  32 ++-
 .../continuous/CacheContinuousQueryHandler.java |  64 ++---
 .../continuous/CacheContinuousQueryManager.java |  80 +++---
 .../cache/transactions/IgniteTxEntry.java       |  29 +-
 .../transactions/IgniteTxLocalAdapter.java      |  18 +-
 .../cache/transactions/IgniteTxManager.java     |   8 +-
 .../continuous/GridContinuousMessage.java       |  43 ++-
 .../continuous/GridContinuousProcessor.java     |  40 ++-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   2 +-
 .../cache/GridCacheStoreValueBytesSelfTest.java |  21 --
 .../processors/cache/GridCacheTestEntryEx.java  |  23 +-
 ...achePartitionedMultiNodeCounterSelfTest.java |   4 +-
 41 files changed, 470 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 68d8c0b..1b0c09c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -150,7 +150,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
                                     }
                                 }
 
-                                ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false);
+                                ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9642bfb..57b5ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.clock.*;
@@ -528,6 +529,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 96:
+                msg = new CacheContinuousQueryEntry();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 34d65bf..58e21f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -963,12 +963,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
                 if (peek != null) {
                     CacheObject v = peek.get();
 
-                    return v.value(ctx, true);
-// TODO IGNITE-51
-//                    if (ctx.portableEnabled())
-//                        v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
-//
-//                    return F.t(ctx.cloneOnFlag(v));
+                    Object val0 = v.value(ctx, true);
+
+                    if (ctx.portableEnabled())
+                        val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
+
+                    return F.t((V)val0);
                 }
             }
 
@@ -980,12 +980,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
                 if (peek != null) {
                     CacheObject v = peek.get();
 
-                    return v.value(ctx, true);
-// TODO IGNITE-51
-//                    if (ctx.portableEnabled())
-//                        v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
-//
-//                    return F.t(ctx.cloneOnFlag(v));
+                    Object val0 = v.value(ctx, true);
+
+                    if (ctx.portableEnabled())
+                        val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
+
+                    return F.t((V) val0);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 30df242..5196965 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -565,17 +565,6 @@ public interface GridCacheEntryEx {
     public boolean markObsoleteVersion(GridCacheVersion ver);
 
     /**
-     * @return Key bytes.
-     */
-    public byte[] keyBytes();
-
-    /**
-     * @return Key bytes.
-     * @throws IgniteCheckedException If marshalling failed.
-     */
-    public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException;
-
-    /**
      * @return Version.
      * @throws GridCacheEntryRemovedException If entry has been removed.
      */
@@ -858,16 +847,10 @@ public interface GridCacheEntryEx {
     @Nullable public GridCacheMvccCandidate localOwner() throws GridCacheEntryRemovedException;
 
     /**
-     * @param keyBytes Key bytes.
-     * @throws GridCacheEntryRemovedException If entry was removed.
-     */
-    public void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException;
-
-    /**
      * @return Value bytes.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    public GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException;
+    public CacheObject valueBytes() throws GridCacheEntryRemovedException;
 
     /**
      * Gets cached serialized value bytes.
@@ -877,7 +860,7 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException If serialization failed.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver)
+    @Nullable public CacheObject valueBytes(@Nullable GridCacheVersion ver)
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 96d7ee2..0179ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -325,7 +325,8 @@ public class GridCacheEntryInfo implements Externalizable, Message {
     public void marshal(GridCacheContext ctx) throws IgniteCheckedException {
         key.prepareMarshal(ctx.cacheObjectContext());
 
-        val.prepareMarshal(ctx.cacheObjectContext());
+        if (val != null)
+            val.prepareMarshal(ctx.cacheObjectContext());
 // TODO IGNITE-51
 //        boolean depEnabled = ctx.gridDeploy().enabled();
 //
@@ -352,7 +353,8 @@ public class GridCacheEntryInfo implements Externalizable, Message {
     public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException {
         key.finishUnmarshal(ctx, clsLdr);
 
-        val.finishUnmarshal(ctx, clsLdr);
+        if (val != null)
+            val.finishUnmarshal(ctx, clsLdr);
 // TODO IGNITE-51
 //        Marshaller mrsh = ctx.marshaller();
 //

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 21255e2..7d1dc73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -272,33 +272,21 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     /**
      * @return Value bytes.
      */
-    protected GridCacheValueBytes valueBytesUnlocked() {
+    protected CacheObject valueBytesUnlocked() {
         assert Thread.holdsLock(this);
 
-        if (!isOffHeapValuesOnly()) {
-// TODO IGNITE-51.
-//            if (valBytes != null)
-//                return GridCacheValueBytes.marshaled(valBytes);
+        CacheObject val0 = val;
 
-            try {
-                if (valPtr != 0 && cctx.offheapTiered())
-                    return offheapValueBytes();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-        else {
-            if (valPtr != 0) {
-                GridUnsafeMemory mem = cctx.unsafeMemory();
-
-                assert mem != null;
+        if (val0 == null && valPtr != 0) {
+            IgniteBiTuple<byte[], Boolean> t = valueBytes0();
 
-                return mem.getOffHeap(valPtr);
-            }
+            if (t.get2())
+                val0 = cctx.toCacheObject(t.get1(), null);
+            else
+                val0 = cctx.toCacheObject(null, t.get1());
         }
 
-        return GridCacheValueBytes.nil();
+        return val0;
     }
 
     /** {@inheritDoc} */
@@ -439,33 +427,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     info.setNew(isStartVersion());
                     info.setDeleted(deletedUnlocked());
 
-                    if (!expired) {
-                        CacheObject val0 = val;
-
-                        if (val0 == null && valPtr != 0) {
-                            IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
-                            if (t.get2())
-                                val0 = cctx.toCacheObject(t.get1(), null);
-                            else
-                                val0 = cctx.toCacheObject(null, t.get1());
-
-                        }
-
-                        info.value(val0);
-// TODO IGNITE-51.
-//                        info.value(cctx.kernalContext().config().isPeerClassLoadingEnabled() ?
-//                            rawGetOrUnmarshalUnlocked(false) : val);
-//
-//                        GridCacheValueBytes valBytes = valueBytesUnlocked();
-//
-//                        if (!valBytes.isNull()) {
-//                            if (valBytes.isPlain())
-//                                info.value((V)valBytes.get());
-//                            else
-//                                info.valueBytes(valBytes.get());
-//                        }
-                   }
+                    if (!expired)
+                        info.value(valueBytesUnlocked());
                 }
             }
         }
@@ -611,6 +574,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      * @return Value bytes and flag indicating whether value is byte array.
      */
     protected IgniteBiTuple<byte[], Boolean> valueBytes0() {
+        assert Thread.holdsLock(this);
+
         if (valPtr != 0) {
             assert isOffHeapValuesOnly() || cctx.offheapTiered();
 
@@ -827,7 +792,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                         taskName);
                 }
 
-                cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null);
+                cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
 
                 // No more notifications.
                 evt = false;
@@ -1064,8 +1029,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
 
-            GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
             if (intercept) {
                 key0 = key.value(cctx, false);
                 val0 = CU.value(val, cctx, false);
@@ -1138,7 +1101,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             }
 
             if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1228,8 +1191,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 }
             }
 
-            GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
             if (old == null)
                 old = saveValueForIndexUnlocked();
 
@@ -1295,7 +1256,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, false);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
 
             cctx.dataStructures().onEntryUpdated(key, true);
         }
@@ -1388,8 +1349,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             // Possibly get old value form store.
             old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
 
-            GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
             boolean readThrough = false;
 
             Object old0 = null;
@@ -1618,7 +1577,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+            cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -1821,7 +1780,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             // Prepare old value and value bytes.
             oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
-            GridCacheValueBytes oldValBytes = valueBytesUnlocked();
 
             // Possibly read value from store.
             boolean readThrough = false;
@@ -2196,8 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(),
-                    oldVal, oldValBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -3298,8 +3255,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 if (!skipQryNtf) {
                     if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
-                        cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null,
-                            preload);
+                        cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }
@@ -3617,7 +3573,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                             null);
                     }
 
-                    cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null);
+                    cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
                 }
             }
         }
@@ -3707,84 +3663,25 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException {
-        checkObsolete();
-
-// TODO IGNITE-51.
-//        if (keyBytes != null)
-//            this.keyBytes = keyBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized byte[] keyBytes() {
-// TODO IGNITE-51.
-//        return keyBytes;
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException {
-// TODO IGNITE-51.
-//        byte[] bytes = keyBytes();
-//
-//        if (bytes != null)
-//            return bytes;
-//
-//        bytes = CU.marshal(cctx.shared(), key);
-//
-//        synchronized (this) {
-//            keyBytes = bytes;
-//        }
-//
-//        return bytes;
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException {
+    @Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException {
         checkObsolete();
 
         return valueBytesUnlocked();
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver)
+    @Nullable @Override public CacheObject valueBytes(@Nullable GridCacheVersion ver)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject val = null;
-        GridCacheValueBytes valBytes = GridCacheValueBytes.nil();
 
-// TODO IGNITE-51.
-//        synchronized (this) {
-//            checkObsolete();
-//
-//            if (ver == null || this.ver.equals(ver)) {
-//                val = this.val;
-//                ver = this.ver;
-//                valBytes = valueBytesUnlocked();
-//
-//                if (valBytes.isNull() && cctx.offheapTiered() && valPtr != 0)
-//                    valBytes = offheapValueBytes();
-//            }
-//            else
-//                ver = null;
-//        }
-//
-//        if (valBytes.isNull()) {
-//            if (val != null)
-//                valBytes = (val instanceof byte[]) ? GridCacheValueBytes.plain(val) :
-//                    GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), val));
-//
-//            if (ver != null && !isOffHeapValuesOnly()) {
-//                synchronized (this) {
-//                    checkObsolete();
-//
-//                    if (this.val == val)
-//                        this.valBytes = isStoreValueBytes() ? valBytes.getIfMarshaled() : null;
-//                }
-//            }
-//        }
+        synchronized (this) {
+            checkObsolete();
+
+            if (ver == null || this.ver.equals(ver))
+                val = valueBytesUnlocked();
+        }
 
-        return valBytes;
+        return val;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 012d393..ef04a8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -678,7 +678,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             return null;
 
         return read(entry.key(),
-            entry.getOrMarshalKeyBytes(),
+            entry.key().valueBytes(cctx),
             entry.partition(),
             locked,
             readOffheap,
@@ -698,8 +698,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        byte[] keyBytes = entry.getOrMarshalKeyBytes();
-
         IgniteBiTuple<Long, Integer> ptr = offheap.valuePointer(spaceName, part, key, key.valueBytes(cctx));
 
         if (ptr != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 95b9095..028ab12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -67,12 +67,14 @@ public class GridCacheTtlUpdateRequest extends GridCacheMessage {
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param ttl TTL.
      */
-    public GridCacheTtlUpdateRequest(long topVer, long ttl) {
+    public GridCacheTtlUpdateRequest(int cacheId, long topVer, long ttl) {
         assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
 
+        this.cacheId = cacheId;
         this.topVer = topVer;
         this.ttl = ttl;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3ce5cd3..c787261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -281,7 +281,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                     log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']');
 
                 // Replace the entry.
-                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
             }
         }
     }
@@ -327,13 +327,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             IgniteTxEntry rmv = readMap.remove(e.txKey());
 
             if (rmv != null) {
-                e.cached(rmv.cached(), null);
+                e.cached(rmv.cached());
 
                 writeMap.put(e.txKey(), e);
             }
             // If lock is explicit.
             else {
-                e.cached(e.context().cache().entryEx(e.key()), null);
+                e.cached(e.context().cache().entryEx(e.key()));
 
                 // explicit lock.
                 writeMap.put(e.txKey(), e);
@@ -456,7 +456,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry while committing (will retry): " + txEntry);
 
-                        txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+                        txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
                     }
                 }
             }
@@ -484,7 +484,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     GridCacheEntryEx cached = txEntry.cached();
 
                                     if (cached == null)
-                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()), null);
+                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()));
 
                                     if (near() && cacheCtx.dr().receiveEnabled()) {
                                         cached.markObsolete(xidVer);
@@ -563,19 +563,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
-                                                CacheObject val0 = null;
-
-                                                GridCacheValueBytes valBytesTuple = cached.valueBytes();
-
-                                                if (!valBytesTuple.isNull()) {
-// TODO IGNITE-51.
-//                                                    if (valBytesTuple.isPlain())
-//                                                        val0 = (V)valBytesTuple.get();
-//                                                    else
-//                                                        valBytes0 = valBytesTuple.get();
-                                                }
-                                                else
-                                                    val0 = cached.rawGet();
+                                                CacheObject val0 = cached.valueBytes();
 
                                                 nearCached.updateOrEvict(xidVer,
                                                     val0,
@@ -621,19 +609,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 cached.updateTtl(null, txEntry.ttl());
 
                                             if (nearCached != null) {
-                                                CacheObject val0 = null;
-
-                                                GridCacheValueBytes valBytesTuple = cached.valueBytes();
-
-                                                if (!valBytesTuple.isNull()) {
-// TODO IGNITE-51.
-//                                                    if (valBytesTuple.isPlain())
-//                                                        val0 = (V)valBytesTuple.get();
-//                                                    else
-//                                                        valBytes0 = valBytesTuple.get();
-                                                }
-                                                else
-                                                    val0 = cached.rawGet();
+                                                CacheObject val0 = cached.valueBytes();
 
                                                 nearCached.updateOrEvict(xidVer,
                                                     val0,
@@ -662,7 +638,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
 
                                     // Renew cached entry.
-                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5febfcc..bb7d308 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -695,10 +695,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 GridCacheTtlUpdateRequest req = reqMap.get(node);
 
                                 if (req == null) {
-                                    reqMap.put(node,
-                                        req = new GridCacheTtlUpdateRequest(topVer, expiryPlc.forAccess()));
-
-                                    req.cacheId(ctx.cacheId());
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+                                        topVer,
+                                        expiryPlc.forAccess()));
                                 }
 
                                 req.addEntry(e.getKey(), e.getValue());
@@ -718,10 +717,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 GridCacheTtlUpdateRequest req = reqMap.get(node);
 
                                 if (req == null) {
-                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest(topVer,
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+                                        topVer,
                                         expiryPlc.forAccess()));
-
-                                    req.cacheId(ctx.cacheId());
                                 }
 
                                 for (IgniteBiTuple<KeyCacheObject, GridCacheVersion> t : e.getValue())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 6e4eac8..15648fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -311,16 +311,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         if (isNew() || !valid(-1) || deletedUnlocked())
             return null;
         else {
-            CacheObject val0 = val;
-
-            if (val0 == null && valPtr != 0) {
-                IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
-                if (t.get2())
-                    val0 = cctx.toCacheObject(t.get1(), null);
-                else
-                    val0 = cctx.toCacheObject(null, t.get1());
-            }
+            CacheObject val0 = valueBytesUnlocked();
 
             return F.t(ver, val0, null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index e9674c8..8eb0809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -419,12 +419,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
                             GridCacheEntryInfo info = it.next();
 
-                            CacheObject v = map.get(info.key());
+                            Object v = map.get(info.key());
 
                             if (v == null)
                                 it.remove();
-                            else
-                                info.value(v);
+                            else if (!skipVals)
+                                info.value((CacheObject)v);
                         }
 
                         return infos;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 056c3ad..6fcc7f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -906,7 +906,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                 assert added.dhtLocal();
 
                 if (added.ownerVersion() != null)
-                    req.owned(e.key(), e.getOrMarshalKeyBytes(), added.ownerVersion());
+                    req.owned(e.key(), added.ownerVersion());
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 4f54f47..87c786d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -260,7 +260,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
      * @param keyBytes Key bytes.
      * @param ownerMapped Owner mapped version.
      */
-    public void owned(KeyCacheObject key, byte[] keyBytes, GridCacheVersion ownerMapped) {
+    public void owned(KeyCacheObject key, GridCacheVersion ownerMapped) {
         if (owned == null)
             owned = new GridLeanMap<>(3);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index d2b7d36..060b02c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -209,7 +209,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 if (e.cached().obsolete()) {
                     GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
 
-                    e.cached(cached, cached.keyBytes());
+                    e.cached(cached);
                 }
 
                 if (e.cached().detached() || e.cached().isLocal())
@@ -234,7 +234,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                     catch (GridCacheEntryRemovedException ignore) {
                         GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
 
-                        e.cached(cached, cached.keyBytes());
+                        e.cached(cached);
                     }
                 }
             }
@@ -462,7 +462,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
                 GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
 
-                entry.cached(cached, null);
+                entry.cached(cached);
 
                 GridCacheVersion explicit = entry.explicitVersion();
 
@@ -562,7 +562,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                     if (read)
                         txEntry.ttl(accessTtl);
 
-                    txEntry.cached(cached, null);
+                    txEntry.cached(cached);
 
                     addReader(msgId, cached, txEntry, topVer);
                 }
@@ -710,7 +710,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), null);
+                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7f9022a..11101fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -429,7 +429,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             if (entry == null) {
                 entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
 
-                txEntry.cached(entry, null);
+                txEntry.cached(entry);
             }
 
             if (tx.optimistic() && txEntry.explicitVersion() == null) {
@@ -455,7 +455,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
 
-                    txEntry.cached(entry, null);
+                    txEntry.cached(entry);
                 }
             }
         }
@@ -625,7 +625,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                         GridCacheVersion dhtVer = entry.version();
 
-                        CacheObject val0 = entry.rawGet();
+                        CacheObject val0 = entry.valueBytes();
 
                         if (val0 != null)
                             res.addOwnedValue(txEntry.txKey(), dhtVer, val0);
@@ -634,7 +634,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     }
                     catch (GridCacheEntryRemovedException ignored) {
                         // Retry.
-                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
                     }
                 }
             }
@@ -655,7 +655,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     GridCacheVersion dhtVer = entry.version();
 
                     if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) {
-                        CacheObject val0 = entry.rawGet();
+                        CacheObject val0 = entry.valueBytes();
 
                         res.addOwnedValue(txEntry.txKey(), dhtVer, val0);
                     }
@@ -664,7 +664,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
                 }
             }
         }
@@ -1015,7 +1015,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
             catch (GridCacheEntryRemovedException ignore) {
                 cached = dht.entryExx(entry.key());
 
-                entry.cached(cached, cached.keyBytes());
+                entry.cached(cached);
             }
         }
 
@@ -1195,7 +1195,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                                     if (e == null)
                                         break;
 
-                                    entry.cached(e, null);
+                                    entry.cached(e);
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 60c9c2f..cfec044 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -167,16 +167,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
         if (nearEvicted != null) {
-            for (IgniteTxKey key : nearEvicted)
+            for (IgniteTxKey key : nearEvicted) {
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
                 key.prepareMarshal(cctx);
+            }
         }
 
         if (preloadEntries != null) {
-            for (GridCacheEntryInfo info : preloadEntries)
+            for (GridCacheEntryInfo info : preloadEntries) {
+                GridCacheContext cctx = ctx.cacheContext(info.cacheId());
+
                 info.marshal(cctx);
+            }
         }
     }
 
@@ -184,16 +188,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        GridCacheContext cctx = ctx.cacheContext(cacheId);
-
         if (nearEvicted != null) {
-            for (IgniteTxKey key : nearEvicted)
+            for (IgniteTxKey key : nearEvicted) {
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
                 key.finishUnmarshal(cctx, ldr);
+            }
         }
 
         if (preloadEntries != null) {
-            for (GridCacheEntryInfo info : preloadEntries)
+            for (GridCacheEntryInfo info : preloadEntries) {
+                GridCacheContext cctx = ctx.cacheContext(info.cacheId());
+
                 info.unmarshal(cctx, ldr);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3f9ce8b..8dc91c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -264,7 +264,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
             checkInternal(entry.txKey());
 
             // Initialize cache entry.
-            entry.cached(cached, null);
+            entry.cached(cached);
 
             writeMap.put(entry.txKey(), entry);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index adcaebf..2012252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2032,7 +2032,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 else
                                     res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
 
-                                if (writeVal != null || !entry.valueBytes().isNull()) {
+                                if (writeVal != null || entry.hasValue()) {
                                     IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
 
                                     assert f == null : f;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a9a26c6..567bf67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,12 +327,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
 
         GridCacheReturn ret = (GridCacheReturn)res;
 
-        if (op != TRANSFORM && ret != null) {
-            CacheObject val = (CacheObject)ret.value();
-
-            ret.value(CU.value(val, cctx, false));
-        }
-
         Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success();
 
         if (op == TRANSFORM && retval == null)
@@ -362,6 +356,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
             return;
         }
 
+        GridCacheReturn ret = res.returnValue();
+
+        if (op != TRANSFORM && ret != null) {
+            CacheObject val = (CacheObject)ret.value();
+
+            ret.value(CU.value(val, cctx, false));
+        }
+
         Boolean single0 = single;
 
         if (single0 != null && single0) {
@@ -374,13 +376,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                 onDone(addFailedKeys(res.failedKeys(), res.error()));
             else {
                 if (op == TRANSFORM) {
-                    if (res.returnValue() != null)
-                        addInvokeResults(res.returnValue());
+                    if (ret != null)
+                        addInvokeResults(ret);
 
                     onDone(opRes);
                 }
                 else {
-                    GridCacheReturn<?> opRes0 = opRes = res.returnValue();
+                    GridCacheReturn<?> opRes0 = opRes = ret;
 
                     onDone(opRes0);
                 }
@@ -398,11 +400,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                     if (op == TRANSFORM) {
                         assert !req.fastMap();
 
-                        if (res.returnValue() != null)
-                            addInvokeResults(res.returnValue());
+                        if (ret != null)
+                            addInvokeResults(ret);
                     }
                     else if (req.fastMap() && req.hasPrimary())
-                        opRes = res.returnValue();
+                        opRes = ret;
                 }
 
                 mappings.remove(nodeId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 29e9730..441c2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -270,7 +270,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         if (inTx()) {
             IgniteTxEntry txEntry = tx.entry(entry.txKey());
 
-            txEntry.cached(entry, null);
+            txEntry.cached(entry);
 
             if (cand != null) {
                 if (!tx.implicit())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index b9602d9..154d99e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -67,11 +67,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheValueBytes valueBytesUnlocked() {
-        return null;
-// TODO IGNITE-51.
-//        return (val != null && val instanceof byte[]) ? GridCacheValueBytes.plain(val) :
-//            valBytes == null ? GridCacheValueBytes.nil() : GridCacheValueBytes.marshaled(valBytes);
+    @Override protected CacheObject valueBytesUnlocked() {
+        return val;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index b47d288..d06ca5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -273,16 +273,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         if (dhtVer == null)
             return null;
         else {
-            CacheObject val0 = val;
-
-            if (val0 == null && valPtr != 0) {
-                IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
-                if (t.get2())
-                    val0 = cctx.toCacheObject(t.get1(), null);
-                else
-                    val0 = cctx.toCacheObject(null, t.get1());
-            }
+            CacheObject val0 = valueBytesUnlocked();
 
             return F.t(ver, val0, null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 9da55ea..c855b47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -316,7 +316,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         if (inTx()) {
             IgniteTxEntry txEntry = tx.entry(entry.txKey());
 
-            txEntry.cached(entry, null);
+            txEntry.cached(entry);
         }
 
         if (c != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8d6800d..ef2899a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -582,7 +582,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                             ", tx=" + this + ']');
 
                     // Replace the entry.
-                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), entry.keyBytes());
+                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
                 }
             }
         }
@@ -1129,7 +1129,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             if (cached.obsoleteVersion() != null) {
                 cached = cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true);
 
-                txEntry.cached(cached, null);
+                txEntry.cached(cached);
             }
 
             return cached;
@@ -1156,7 +1156,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             if (cached.obsoleteVersion() != null) {
                 cached = cacheCtx.colocated().entryExx(key.key(), topVer, true);
 
-                txEntry.cached(cached, null);
+                txEntry.cached(cached);
             }
 
             return cached;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 491a171..9bb3aa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -778,11 +778,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
         // Must re-initialize cached entry while holding topology lock.
         if (cacheCtx.isNear())
-            entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), null);
+            entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
         else if (!cacheCtx.isLocal())
-            entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), null);
+            entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
         else
-            entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), null);
+            entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
         if (cacheCtx.isNear() || cacheCtx.isLocal()) {
             if (waitLock && entry.explicitVersion() == null) {
@@ -812,7 +812,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
                     break;
                 }
                 catch (GridCacheEntryRemovedException ignore) {
-                    entry.cached(cacheCtx.near().entryEx(entry.key()), null);
+                    entry.cached(cacheCtx.near().entryEx(entry.key()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 2b40e88..ccd4c8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -299,7 +299,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
                 }
                 else {
                     // Initialize cache entry.
-                    entry.cached(cached, null);
+                    entry.cached(cached);
 
                     writeMap.put(entry.txKey(), entry);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
index 619ce38..bc248aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
@@ -218,7 +218,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx>
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in checkLocks method (will retry): " + txEntry);
 
-                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
                 }
             }
         }
@@ -264,7 +264,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx>
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in onOwnerChanged method (will retry): " + txEntry);
 
-                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+                    txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index d98b254..43aaec3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -18,144 +18,145 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
-import java.io.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*;
+import javax.cache.event.*;
+import java.nio.*;
 
 /**
  * Continuous query entry.
  */
-class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable {
+public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
+    /** */
+    private static final EventType[] EVT_TYPE_VALS = EventType.values();
+
+    /**
+     * @param ord Event type ordinal value.
+     * @return Event type.
+     */
+    @Nullable public static EventType eventTypeFromOrdinal(int ord) {
+        return ord >= 0 && ord < EVT_TYPE_VALS.length ? EVT_TYPE_VALS[ord] : null;
+    }
+
     /** */
-    private static final long serialVersionUID = 0L;
+    private EventType evtType;
 
     /** Key. */
     @GridToStringInclude
-    private K key;
+    private KeyCacheObject key;
 
     /** New value. */
     @GridToStringInclude
-    private V newVal;
+    private CacheObject newVal;
 
     /** Old value. */
     @GridToStringInclude
-    private V oldVal;
-
-    /** Serialized key. */
-    @GridToStringExclude
-    private byte[] keyBytes;
-
-    /** Serialized value. */
-    @GridToStringExclude
-    private GridCacheValueBytes newValBytes;
-
-    /** Serialized value. */
-    @GridToStringExclude
-    private GridCacheValueBytes oldValBytes;
+    private CacheObject oldVal;
 
     /** Cache name. */
-    private String cacheName;
+    private int cacheId;
 
     /** Deployment info. */
     @GridToStringExclude
+    @GridDirectTransient
     private GridDeploymentInfo depInfo;
 
+    /**
+     * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
+     */
     public CacheContinuousQueryEntry() {
         // No-op.
     }
 
-    CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal,
-        @Nullable GridCacheValueBytes oldValBytes) {
-
+    /**
+     * @param cacheId Cache ID.
+     * @param evtType Event type.
+     * @param key Key.
+     * @param newVal New value.
+     * @param oldVal Old value.
+     */
+    CacheContinuousQueryEntry(
+        int cacheId,
+        EventType evtType,
+        KeyCacheObject key,
+        @Nullable CacheObject newVal,
+        @Nullable CacheObject oldVal) {
+        this.cacheId = cacheId;
+        this.evtType = evtType;
         this.key = key;
         this.newVal = newVal;
-        this.newValBytes = newValBytes;
         this.oldVal = oldVal;
-        this.oldValBytes = oldValBytes;
     }
 
     /**
-     * @param cacheName Cache name.
+     * @return Cache ID.
      */
-    void cacheName(String cacheName) {
-        this.cacheName = cacheName;
+    int cacheId() {
+        return cacheId;
     }
 
     /**
-     * @return cache name.
+     * @return Event type.
      */
-    String cacheName() {
-        return cacheName;
+    EventType eventType() {
+        return evtType;
     }
 
     /**
-     * @param marsh Marshaller.
+     * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
-    void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
-        assert marsh != null;
-
+    void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
         assert key != null;
 
-        keyBytes = marsh.marshal(key);
+        key.prepareMarshal(cctx.cacheObjectContext());
 
-        if (newValBytes == null || newValBytes.isNull())
-            newValBytes = newVal != null ?
-                newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null;
+        if (newVal != null)
+            newVal.prepareMarshal(cctx.cacheObjectContext());
 
-        if (oldValBytes == null || oldValBytes.isNull())
-            oldValBytes = oldVal != null ?
-                oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null;
+        if (oldVal != null)
+            oldVal.prepareMarshal(cctx.cacheObjectContext());
     }
 
     /**
-     * @param marsh Marshaller.
+     * @param cctx Cache context.
      * @param ldr Class loader.
      * @throws IgniteCheckedException In case of error.
      */
-    void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-        assert marsh != null;
+    void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        key.finishUnmarshal(cctx, ldr);
 
-        assert key == null : "Key should be null: " + key;
-        assert newVal == null : "New value should be null: " + newVal;
-        assert oldVal == null : "Old value should be null: " + oldVal;
-        assert keyBytes != null;
+        if (newVal != null)
+            newVal.finishUnmarshal(cctx, ldr);
 
-        key = marsh.unmarshal(keyBytes, ldr);
-
-        if (newValBytes != null && !newValBytes.isNull())
-            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
-
-        if (oldValBytes != null && !oldValBytes.isNull())
-            oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr);
+        if (oldVal != null)
+            oldVal.finishUnmarshal(cctx, ldr);
     }
 
     /**
      * @return Key.
      */
-    K key() {
+    KeyCacheObject key() {
         return key;
     }
 
     /**
      * @return New value.
      */
-    V value() {
+    CacheObject value() {
         return newVal;
     }
 
     /**
      * @return Old value.
      */
-    V oldValue() {
+    CacheObject oldValue() {
         return oldVal;
     }
 
@@ -170,62 +171,117 @@ class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externaliz
     }
 
     /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        boolean b = keyBytes != null;
-
-        out.writeBoolean(b);
-
-        if (b) {
-            U.writeByteArray(out, keyBytes);
-
-            if (newValBytes != null && !newValBytes.isNull()) {
-                out.writeBoolean(true);
-                out.writeBoolean(newValBytes.isPlain());
-                U.writeByteArray(out, newValBytes.get());
-            }
-            else
-                out.writeBoolean(false);
-
-            if (oldValBytes != null && !oldValBytes.isNull()) {
-                out.writeBoolean(true);
-                out.writeBoolean(oldValBytes.isPlain());
-                U.writeByteArray(out, oldValBytes.get());
-            }
-            else
-                out.writeBoolean(false);
-
-            U.writeString(out, cacheName);
-            out.writeObject(depInfo);
+    @Override public byte directType() {
+        return 96;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
         }
-        else {
-            out.writeObject(key);
-            out.writeObject(newVal);
-            out.writeObject(oldVal);
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cacheId", cacheId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeMessage("newVal", newVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("oldVal", oldVal))
+                    return false;
+
+                writer.incrementState();
+
         }
+
+        return true;
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        boolean b = in.readBoolean();
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
 
-        if (b) {
-            keyBytes = U.readByteArray(in);
+        if (!reader.beforeMessageRead())
+            return false;
 
-            if (in.readBoolean())
-                newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+        switch (reader.state()) {
+            case 0:
+                cacheId = reader.readInt("cacheId");
 
-            if (in.readBoolean())
-                oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                byte evtTypeOrd;
+
+                evtTypeOrd = reader.readByte("evtType");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                evtType = eventTypeFromOrdinal(evtTypeOrd);
+
+                reader.incrementState();
+
+            case 2:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                newVal = reader.readMessage("newVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                oldVal = reader.readMessage("oldVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
 
-            cacheName = U.readString(in);
-            depInfo = (GridDeploymentInfo)in.readObject();
-        }
-        else {
-            key = (K)in.readObject();
-            newVal = (V)in.readObject();
-            oldVal = (V)in.readObject();
         }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index c90ae34..1bdadaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -27,43 +28,45 @@ import javax.cache.event.*;
  * Continuous query event.
  */
 class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+    /** */
+    private final GridCacheContext cctx;
+
     /** Entry. */
     @GridToStringExclude
-    private final CacheContinuousQueryEntry<K, V> e;
+    private final CacheContinuousQueryEntry e;
 
     /**
-     * @param source Source cache.
-     * @param eventType Event type.
+     * @param src Source cache.
+     * @param cctx Cache context.
      * @param e Entry.
      */
-    CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) {
-        super(source, eventType);
-
-        assert e != null;
+    CacheContinuousQueryEvent(Cache src, GridCacheContext cctx, CacheContinuousQueryEntry e) {
+        super(src, e.eventType());
 
+        this.cctx = cctx;
         this.e = e;
     }
 
     /**
      * @return Entry.
      */
-    CacheContinuousQueryEntry<K, V> entry() {
+    CacheContinuousQueryEntry entry() {
         return e;
     }
 
     /** {@inheritDoc} */
     @Override public K getKey() {
-        return e.key();
+        return e.key().value(cctx, false);
     }
 
     /** {@inheritDoc} */
     @Override public V getValue() {
-        return e.value();
+        return CU.value(e.value(), cctx, false);
     }
 
     /** {@inheritDoc} */
     @Override public V getOldValue() {
-        return e.oldValue();
+        return CU.value(e.oldValue(), cctx, false);
     }
 
     /** {@inheritDoc} */
@@ -81,7 +84,10 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(CacheContinuousQueryEvent.class, this, "key", e.key(), "newVal", e.value(), "oldVal",
-            e.oldValue(), "cacheName", e.cacheName());
+        return S.toString(CacheContinuousQueryEvent.class, this,
+            "evtType", getEventType(),
+            "key", getKey(),
+            "newVal", getValue(),
+            "oldVal", getOldValue());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 9502b3f..53a2cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -212,17 +212,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                             if (ctx.config().isPeerClassLoadingEnabled() && node != null &&
                                 U.hasCache(node, cacheName)) {
-                                evt.entry().p2pMarshal(ctx.config().getMarshaller());
-
-                                evt.entry().cacheName(cacheName);
+                                evt.entry().prepareMarshal(cctx);
 
                                 GridCacheDeploymentManager depMgr =
                                     ctx.cache().internalCache(cacheName).context().deploy();
 
                                 depMgr.prepare(evt.entry());
                             }
+                            else
+                                evt.entry().prepareMarshal(cctx);
 
-                            ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync);
+                            ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
                         }
                         catch (IgniteCheckedException ex) {
                             U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
@@ -302,46 +302,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         assert objs != null;
         assert ctx != null;
 
-        Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
-            (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs;
-
-        if (ctx.config().isPeerClassLoadingEnabled()) {
-            for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
-                assert evt instanceof CacheContinuousQueryEvent;
+        Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
 
-                CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry();
+        final GridCacheContext cctx = cacheContext(ctx);
 
-                GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName());
+        for (CacheContinuousQueryEntry e : entries) {
+            GridCacheDeploymentManager depMgr = cctx.deploy();
 
-                ClassLoader ldr = null;
+            ClassLoader ldr = depMgr.globalLoader();
 
-                if (cache != null) {
-                    GridCacheDeploymentManager depMgr = cache.context().deploy();
+            if (ctx.config().isPeerClassLoadingEnabled()) {
+                GridDeploymentInfo depInfo = e.deployInfo();
 
-                    GridDeploymentInfo depInfo = e.deployInfo();
-
-                    if (depInfo != null) {
-                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
-                            depInfo.participants(), depInfo.localDeploymentOwner());
-                    }
-
-                    ldr = depMgr.globalLoader();
-                }
-                else {
-                    U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
-                        "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " +
-                        "with default class loader.");
+                if (depInfo != null) {
+                    depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
+                        depInfo.participants(), depInfo.localDeploymentOwner());
                 }
+            }
 
-                try {
-                    e.p2pUnmarshal(ctx.config().getMarshaller(), ldr);
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
-                }
+            try {
+                e.unmarshal(cctx, ldr);
+            }
+            catch (IgniteCheckedException ex) {
+                U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
             }
         }
 
+        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+            new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                    return new CacheContinuousQueryEvent<K, V>(cache, cctx, e);
+                };
+            }
+        );
+
         locLsnr.onUpdated(evts);
     }