You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 22:22:35 UTC

[5/6] incubator-ignite git commit: # ignite-51

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b5f8666..c336d00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -116,18 +116,14 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
      * @param e Cache entry.
      * @param key Key.
      * @param newVal New value.
-     * @param newBytes New value bytes.
      * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
      * @param preload Whether update happened during preloading.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx e,
         KeyCacheObject key,
         CacheObject newVal,
-        GridCacheValueBytes newBytes,
         CacheObject oldVal,
-        GridCacheValueBytes oldBytes,
         boolean preload)
         throws IgniteCheckedException
     {
@@ -149,8 +145,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
         if (F.isEmpty(lsnrCol))
             return;
 
-        boolean hasNewVal = newVal != null || (newBytes != null && !newBytes.isNull());
-        boolean hasOldVal = oldVal != null || (oldBytes != null && !oldBytes.isNull());
+        boolean hasNewVal = newVal != null;
+        boolean hasOldVal = oldVal != null;
 
         if (!hasNewVal && !hasOldVal)
             return;
@@ -169,27 +165,26 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
             if (!initialized) {
                 if (lsnr.oldValueRequired()) {
                     oldVal = cctx.unwrapTemporary(oldVal);
-// TODO IGNITE-51.
-//                    if (oldVal == null && oldBytes != null && !oldBytes.isNull())
-//                        oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : cctx.marshaller().<V>unmarshal(oldBytes.get
-//                            (), cctx.deploy().globalLoader());
-                }
 
-                if (newVal == null && newBytes != null && !newBytes.isNull()) {
-// TODO IGNITE-51.
-//                    newVal = newBytes.isPlain() ? (V) newBytes.get() : cctx.marshaller().<V>unmarshal(newBytes.get(),
-//                        cctx.deploy().globalLoader());
+                    if (oldVal != null)
+                        oldVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
                 }
+
+                if (newVal != null)
+                    newVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+
+                initialized = true;
             }
 
-            CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key.<K>value(cctx, false),
-                CU.<V>value(newVal, cctx, false),
-                newBytes,
-                lsnr.oldValueRequired() ? CU.<V>value(oldVal, cctx, false) : null,
-                lsnr.oldValueRequired() ? oldBytes : null);
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                evtType,
+                key,
+                newVal,
+                lsnr.oldValueRequired() ? oldVal : null);
 
             CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
-                cctx.kernalContext().cache().jcache(cctx.name()), evtType, e0);
+                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
             lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
         }
@@ -199,10 +194,9 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
      * @param e Entry.
      * @param key Key.
      * @param oldVal Old value.
-     * @param oldBytes Old value bytes.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryExpired(GridCacheEntryEx e, KeyCacheObject key, CacheObject oldVal, GridCacheValueBytes oldBytes)
+    public void onEntryExpired(GridCacheEntryEx e, KeyCacheObject key, CacheObject oldVal)
         throws IgniteCheckedException {
         assert e != null;
         assert key != null;
@@ -223,26 +217,24 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
 
             for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
                 if (!initialized) {
-                    if (lsnr.oldValueRequired()) {
+                    if (lsnr.oldValueRequired())
                         oldVal = cctx.unwrapTemporary(oldVal);
 
-                        if (oldVal == null && oldBytes != null && !oldBytes.isNull()) {
-// TODO IGNITE-51.
-//                            oldVal = oldBytes.isPlain() ? (V) oldBytes.get() :
-//                                cctx.marshaller().<V>unmarshal(oldBytes.get(), cctx.deploy().globalLoader());
-                        }
-                    }
+                    if (oldVal != null)
+                        oldVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+
+                    initialized = true;
                 }
 
-               // TODO IGNITE-51.
-               CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key.<K>value(cctx, false),
-                    null,
-                    null,
-                    lsnr.oldValueRequired() ? CU.<V>value(oldVal, cctx, false) : null,
-                    lsnr.oldValueRequired() ? oldBytes : null);
+               CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                   cctx.cacheId(),
+                   EXPIRED,
+                   key,
+                   null,
+                   lsnr.oldValueRequired() ? oldVal : null);
 
-                CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
-                    cctx.kernalContext().cache().jcache(cctx.name()), EXPIRED, e0);
+                CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent(
+                    cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
                 lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
             }
@@ -419,7 +411,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
             autoUnsubscribe, grp.predicate()).get();
 
         if (notifyExisting) {
-            final Iterator<Cache.Entry<K, V>> it = cctx.cache().entrySetx().iterator();
+            final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
 
             locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() {
                 @Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() {
@@ -456,11 +448,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
                                 if (!it.hasNext())
                                     break;
 
-                                Cache.Entry<K, V> e = it.next();
+                                GridCacheEntryEx e = it.next();
 
                                 next = new CacheContinuousQueryEvent<>(
-                                    cctx.kernalContext().cache().jcache(cctx.name()), CREATED,
-                                    new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null));
+                                    cctx.kernalContext().cache().jcache(cctx.name()),
+                                    cctx,
+                                    new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
 
                                 if (rmtFilter != null && !rmtFilter.evaluate(next))
                                     next = null;
@@ -686,8 +679,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
 
             Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1);
 
-            evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(),
-                ((CacheContinuousQueryEvent<? extends K, ? extends V>)evt).entry()));
+            evts.add(evt);
 
             return evts;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 66eea11..db166fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -419,9 +419,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
 
     /**
      * @param entry Cache entry.
-     * @param keyBytes Key bytes, possibly {@code null}.
      */
-    public void cached(GridCacheEntryEx entry, @Nullable byte[] keyBytes) {
+    public void cached(GridCacheEntryEx entry) {
         assert entry != null;
 
         assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this +
@@ -576,32 +575,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
     }
 
     /**
-     * @param cacheVal Value.
-     * @return New value.
-     */
-    public <V> V applyEntryProcessors(V cacheVal) {
-        Object val = cacheVal;
-        Object key = CU.value(this.key, ctx, false);
-
-        for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
-            try {
-                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, key, val);
-
-                EntryProcessor processor = t.get1();
-
-                processor.process(invokeEntry, t.get2());
-
-                val = invokeEntry.getValue();
-            }
-            catch (Exception ignore) {
-                // No-op.
-            }
-        }
-
-        return (V)val;
-    }
-
-    /**
      * @param entryProcessorsCol Collection of entry processors.
      */
     public void entryProcessors(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7f39503..d841c90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -926,7 +926,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 if (log.isDebugEnabled())
                                     log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
 
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), null);
+                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()));
                             }
                         }
                     }
@@ -1214,7 +1214,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 txEntry.readValue(e.<V>value());
                         }
                         catch (GridCacheEntryRemovedException ignored) {
-                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), null);
+                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
                         }
                     }
                 }
@@ -1487,7 +1487,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 }
 
                                 if (txEntry != null)
-                                    txEntry.cached(entryEx(cacheCtx, txKey), null);
+                                    txEntry.cached(entryEx(cacheCtx, txKey));
 
                                 continue; // While loop.
                             }
@@ -1685,10 +1685,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                                         txEntry.setAndMarkValid(val);
 
-                                        Object val0 = val.value(cacheCtx, false);
-
                                         if (!F.isEmpty(txEntry.entryProcessors()))
-                                            val0 = txEntry.applyEntryProcessors(val0);
+                                            val = txEntry.applyEntryProcessors(val);
 
                                         cacheCtx.addResult(retMap,
                                             cacheKey,
@@ -1710,7 +1708,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         log.debug("Got removed exception in get postLock (will retry): " +
                                             cached);
 
-                                    txEntry.cached(entryEx(cacheCtx, txKey), null);
+                                    txEntry.cached(entryEx(cacheCtx, txKey));
                                 }
                                 catch (GridCacheFilterFailedException e) {
                                     // Failed value for the filter.
@@ -2443,7 +2441,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
 
-                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()), null);
+                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()));
                 }
             }
         }
@@ -3236,7 +3234,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             }
 
             // Keep old ttl value.
-            old.cached(entry, null);
+            old.cached(entry);
             old.filters(filter);
 
             // Update ttl if specified.
@@ -3292,7 +3290,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 entry = entryEx(entry.context(), txEntry.txKey(), topologyVersion());
 
-                txEntry.cached(entry, null);
+                txEntry.cached(entry);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8f1007d..73f4ce7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1544,7 +1544,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     try {
                         // Renew cache entry.
-                        txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()), null);
+                        txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()));
                     }
                     catch (GridDhtInvalidPartitionException e) {
                         assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" +
@@ -1606,7 +1606,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     GridCacheAdapter cache = cacheCtx.cache();
 
                     // Renew cache entry.
-                    txEntry.cached(cache.entryEx(txEntry.key()), null);
+                    txEntry.cached(cache.entryEx(txEntry.key()));
                 }
             }
         }
@@ -1638,7 +1638,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry);
 
                     // Renew cache entry.
-                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
                 }
             }
         }
@@ -1907,7 +1907,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         if (cached == null || cached.detached())
                             cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
 
-                        recovered.cached(cached, cached.keyBytes());
+                        recovered.cached(cached);
 
                         tx.writeMap().put(entry.txKey(), recovered);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
index a207372..fe50fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@ -48,6 +48,10 @@ public class GridContinuousMessage implements Message {
     @GridDirectTransient
     private Object data;
 
+    /** */
+    @GridDirectCollection(Message.class)
+    private Collection<Message> msgs;
+
     /** Serialized message data. */
     private byte[] dataBytes;
 
@@ -66,18 +70,24 @@ public class GridContinuousMessage implements Message {
      * @param routineId Consume ID.
      * @param futId Future ID.
      * @param data Optional message data.
+     * @param msgs If {@code true} then data is collection of messages.
      */
     GridContinuousMessage(GridContinuousMessageType type,
         @Nullable UUID routineId,
         @Nullable IgniteUuid futId,
-        @Nullable Object data) {
+        @Nullable Object data,
+        boolean msgs) {
         assert type != null;
         assert routineId != null || type == MSG_EVT_ACK;
 
         this.type = type;
         this.routineId = routineId;
         this.futId = futId;
-        this.data = data;
+
+        if (msgs)
+            this.msgs = (Collection)data;
+        else
+            this.data = data;
     }
 
     /**
@@ -95,11 +105,18 @@ public class GridContinuousMessage implements Message {
     }
 
     /**
+     * @return {@code True} is data is collection of messages.
+     */
+    public boolean messages() {
+        return msgs != null;
+    }
+
+    /**
      * @return Message data.
      */
     @SuppressWarnings("unchecked")
     public <T> T data() {
-        return (T)data;
+        return msgs != null ? (T)msgs : (T)data;
     }
 
     /**
@@ -155,12 +172,18 @@ public class GridContinuousMessage implements Message {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeUuid("routineId", routineId))
+                if (!writer.writeCollection("msgs", msgs, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
                 if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
                     return false;
 
@@ -196,7 +219,7 @@ public class GridContinuousMessage implements Message {
                 reader.incrementState();
 
             case 2:
-                routineId = reader.readUuid("routineId");
+                msgs = reader.readCollection("msgs", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -204,6 +227,14 @@ public class GridContinuousMessage implements Message {
                 reader.incrementState();
 
             case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
                 byte typeOrd;
 
                 typeOrd = reader.readByte("type");
@@ -227,7 +258,7 @@ public class GridContinuousMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 5;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eed273d..4f427aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
@@ -471,7 +472,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             // these nodes.
             for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) {
                 if (nodeIds.add(e.getKey()))
-                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData));
+                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false));
             }
 
             // Register routine locally.
@@ -523,7 +524,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
             // Send start requests.
             try {
-                GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData);
+                GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false);
 
                 sendWithRetries(nodes, req, null);
             }
@@ -629,14 +630,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 // Register acknowledge timeout (timeout object will be removed when
                 // future is completed).
                 fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId,
-                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null)));
+                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false)));
 
                 // Send stop requests.
                 try {
                     for (ClusterNode node : nodes) {
                         try {
                             sendWithRetries(node.id(),
-                                new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null),
+                                new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false),
                                 null);
                         }
                         catch (ClusterTopologyCheckedException ignored) {
@@ -673,10 +674,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         UUID routineId,
         @Nullable Object obj,
         @Nullable Object orderedTopic,
-        boolean sync)
+        boolean sync,
+        boolean msg)
         throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
+        assert !msg || obj instanceof Message : obj;
 
         assert !nodeId.equals(ctx.localNodeId());
 
@@ -693,7 +696,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -707,7 +710,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 Collection<Object> toSnd = info.add(obj);
 
                 if (toSnd != null)
-                    sendNotification(nodeId, routineId, null, toSnd, orderedTopic);
+                    sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
             }
         }
     }
@@ -725,13 +728,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         UUID routineId,
         @Nullable IgniteUuid futId,
         Collection<Object> toSnd,
-        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        @Nullable Object orderedTopic,
+        boolean msg) throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert toSnd != null;
         assert !toSnd.isEmpty();
 
-        sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), orderedTopic);
+        sendWithRetries(nodeId,
+            new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
+            orderedTopic);
     }
 
     /**
@@ -793,7 +799,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err), null);
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())
@@ -854,7 +860,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         unregisterRemote(routineId);
 
         try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null), null);
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())
@@ -917,7 +923,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (msg.futureId() != null) {
                 try {
                     sendWithRetries(nodeId,
-                        new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null),
+                        new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
                         null);
                 }
                 catch (IgniteCheckedException e) {
@@ -1015,9 +1021,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             Collection<Object> toSnd = t.get1();
 
-                            if (toSnd != null) {
+                            if (toSnd != null && !toSnd.isEmpty()) {
                                 try {
-                                    sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic());
+                                    boolean msg = toSnd.iterator().next() instanceof Message;
+
+                                    sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
                                 }
                                 catch (ClusterTopologyCheckedException ignored) {
                                     if (log.isDebugEnabled())
@@ -1129,7 +1137,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         assert !F.isEmpty(nodes);
         assert msg != null;
 
-        if (msg.data() != null && (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
+        if (!msg.messages() &&
+            msg.data() != null &&
+            (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
             msg.dataBytes(marsh.marshal(msg.data()));
 
         for (ClusterNode node : nodes) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index b53386c..8e4c7cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3921,7 +3921,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /**
      *
      */
-    public void testCopyOnGet() {
+    public void _testCopyOnGet() {
         IgniteCache<Integer, TestMutableObj> mutObjCache = ignite(0).jcache(null);
         IgniteCache<Integer, TestImmutableObj> immObjCache = ignite(0).jcache(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
index bd9a683..3871257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
@@ -72,27 +72,6 @@ public class GridCacheStoreValueBytesSelfTest extends GridCommonAbstractTest {
      *
      * @throws Exception If failed.
      */
-    public void testDisabled() throws Exception {
-        storeValBytes = false;
-
-        Ignite g0 = startGrid(0);
-        Ignite g1 = startGrid(1);
-
-        IgniteCache<Integer, String> c = g0.jcache(null);
-
-        c.put(1, "Cached value");
-
-        GridCacheEntryEx entry = ((IgniteKernal)g1).internalCache().peekEx(1);
-
-        assert entry != null;
-        assert entry.valueBytes().isNull();
-    }
-
-    /**
-     * JUnit.
-     *
-     * @throws Exception If failed.
-     */
     public void testEnabled() throws Exception {
         storeValBytes = true;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 40691d8..56eb096 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -607,16 +607,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** @inheritDoc */
-    @Override public byte[] keyBytes() {
-        assert false; return null;
-    }
-
-    /** @inheritDoc */
-    @Override public byte[] getOrMarshalKeyBytes() {
-        assert false; return null;
-    }
-
-    /** @inheritDoc */
     @Override public GridCacheVersion version() {
         return ver;
     }
@@ -786,18 +776,17 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** @inheritDoc */
-    @Override public void keyBytes(byte[] keyBytes) {
+    @Override public CacheObject valueBytes() {
         assert false;
-    }
 
-    /** @inheritDoc */
-    @Override public GridCacheValueBytes valueBytes() {
-        assert false; return GridCacheValueBytes.nil();
+        return null;
     }
 
     /** @inheritDoc */
-    @Override public GridCacheValueBytes valueBytes(GridCacheVersion ver) {
-        assert false; return GridCacheValueBytes.nil();
+    @Override public CacheObject valueBytes(GridCacheVersion ver) {
+        assert false;
+
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
index fbf4f5f..7f73726 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
@@ -270,7 +270,7 @@ public class GridCachePartitionedMultiNodeCounterSelfTest extends GridCommonAbst
 
         assertNotNull(dhtEntry);
 
-        assertEquals(Integer.valueOf(0), dhtEntry.rawGet());
+        assertEquals(Integer.valueOf(0), dhtEntry.rawGet().value(dhtEntry.context(), false));
 
         final AtomicInteger globalCntr = new AtomicInteger(0);
 
@@ -560,7 +560,7 @@ public class GridCachePartitionedMultiNodeCounterSelfTest extends GridCommonAbst
 
         assertNotNull(dhtEntry);
 
-        assertEquals(Integer.valueOf(0), dhtEntry.rawGet());
+        assertEquals(Integer.valueOf(0), dhtEntry.rawGet().value(dhtEntry.context(), false));
 
         startLatchMultiNode = new CountDownLatch(gridCnt);