You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/23 13:52:11 UTC

[09/19] ignite git commit: IGNITE-426 WIP

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3253dda..ca3579e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -21,13 +21,19 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -53,6 +59,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -119,6 +126,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     private transient Map<Integer, Long> rcvCntrs;
 
     /** */
+    private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+
+    /** */
     private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
 
     /** */
@@ -183,6 +193,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         else {
             rcvCntrs = new ConcurrentHashMap<>();
 
+            rcvs = new ConcurrentHashMap<>();
+
             dupEvtFilter = new DuplicateEventFilter();
         }
 
@@ -258,6 +270,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
+                // Check that cache stopped.
+                if (cctx == null)
+                    return;
+
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
@@ -272,27 +288,78 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     }
                 }
 
-                if (notify) {
-                    try {
-                        final CacheContinuousQueryEntry entry = evt.entry();
+                try {
+                    final CacheContinuousQueryEntry entry = notify ? evt.entry() :
+                        new CacheContinuousQueryFilteredEntry(evt.entry());
 
-                        if (primary || skipPrimaryCheck) {
-                            if (loc) {
-                                if (dupEvtFilter.apply(entry)) {
-                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                    if (primary || skipPrimaryCheck) {
+                        if (loc) {
+                            if (dupEvtFilter.apply(entry)) {
+                                locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
 
-                                    if (!skipPrimaryCheck)
-                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-                                }
+                                if (!skipPrimaryCheck)
+                                    sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
                             }
-                            else {
-                                prepareEntry(cctx, nodeId, entry);
+                        }
+                        else {
+                            prepareEntry(cctx, nodeId, entry);
 
-                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
-                            }
+                            ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                         }
-                        else
-                            backupQueue.add(entry);
+                    }
+                    else
+                        backupQueue.add(entry);
+                }
+                catch (ClusterTopologyCheckedException ex) {
+                    IgniteLogger log = ctx.log(getClass());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send event notification to node, node left cluster " +
+                            "[node=" + nodeId + ", err=" + ex + ']');
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                }
+
+                if (recordIgniteEvt && notify) {
+                    ctx.event().record(new CacheQueryReadEvent<>(
+                        ctx.discovery().localNode(),
+                        "Continuous query executed.",
+                        EVT_CACHE_QUERY_OBJECT_READ,
+                        CacheQueryType.CONTINUOUS.name(),
+                        cacheName,
+                        null,
+                        null,
+                        null,
+                        rmtFilter,
+                        null,
+                        nodeId,
+                        taskName(),
+                        evt.getKey(),
+                        evt.getValue(),
+                        evt.getOldValue(),
+                        null
+                    ));
+                }
+            }
+
+            @Override public void partitionLost(String cacheName0, int partId) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                // Check that cache stopped.
+                if (cctx == null)
+                    return;
+
+                if ((cacheName == null && cacheName0 == null) || // Check default cache.
+                    (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) {
+
+                    final CacheContinuousQueryEntry entry =
+                        new CacheContinuousQueryLostPartition(cctx.cacheId(), partId);
+
+                    try {
+                        prepareEntry(cctx, nodeId, entry);
+
+                        ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                     }
                     catch (ClusterTopologyCheckedException ex) {
                         IgniteLogger log = ctx.log(getClass());
@@ -304,27 +371,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     catch (IgniteCheckedException ex) {
                         U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
                     }
-
-                    if (recordIgniteEvt) {
-                        ctx.event().record(new CacheQueryReadEvent<>(
-                            ctx.discovery().localNode(),
-                            "Continuous query executed.",
-                            EVT_CACHE_QUERY_OBJECT_READ,
-                            CacheQueryType.CONTINUOUS.name(),
-                            cacheName,
-                            null,
-                            null,
-                            null,
-                            rmtFilter,
-                            null,
-                            nodeId,
-                            taskName(),
-                            evt.getKey(),
-                            evt.getValue(),
-                            evt.getOldValue(),
-                            null
-                        ));
-                    }
                 }
             }
 
@@ -476,13 +522,47 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+        Map<Integer, PartitionRecovery> parts = new HashMap<>();
+
+        for (CacheContinuousQueryEntry e : entries) {
+            PartitionRecovery rec = parts.containsKey(e.partition()) ?
+                parts.get(e.partition()) : rcvs.get(e.partition());
+
+            if (rec == null) {
+                rec = new PartitionRecovery(ctx.log(getClass()));
+
+                PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+
+                if (oldRec != null)
+                    rec = oldRec;
+            }
+
+            if (e instanceof CacheContinuousQueryLostPartition)
+                rec.reset();
+            else {
+                rec.add(e);
+
+                if (!parts.containsKey(e.partition()))
+                    parts.put(e.partition(), rec);
+            }
+        }
+
+        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
+        for (PartitionRecovery rec : parts.values())
+            entries0.addAll(rec.entries());
+
+        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
             new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
                 @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
                     return new CacheContinuousQueryEvent<>(cache, cctx, e);
                 }
             },
-            dupEvtFilter
+            new IgnitePredicate<CacheContinuousQueryEntry>() {
+                @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                    return !entry.filtered();
+                }
+            }
         );
 
         locLsnr.onUpdated(evts);
@@ -500,12 +580,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         if (cntr != null) {
             long cntr0 = cntr;
 
-            if (e.updateIndex() > cntr0) {
-                // TODO IGNITE-426: remove assert.
-                assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
-
+            if (e.updateIndex() > cntr0)
                 rcvCntrs.put(part, e.updateIndex());
-            }
             else
                 return false;
         }
@@ -515,6 +591,119 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         return true;
     }
 
+    /**
+     *
+     */
+    private static class PartitionRecovery {
+        /** */
+        private IgniteLogger log;
+
+        /** */
+        private long lastFiredEvt = 0;
+
+        /** */
+        private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
+
+        /**
+         * @param log Logger.
+         */
+        public PartitionRecovery(IgniteLogger log) {
+            this.log = log;
+        }
+
+        /**
+         * Add continuous entry.
+         *
+         * @param e Cache continuous qeury entry.
+         */
+        public void add(CacheContinuousQueryEntry e) {
+            synchronized (pendingEnts) {
+                if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt)
+                    e.cacheId();
+                    //log.info("Skip duplicate continuous query entry. Entry: " + e);
+                else {
+                    //log.info("Added continuous query entry. Entry: " + e);
+
+                    pendingEnts.put(e.updateIndex(), e);
+                }
+            }
+        }
+
+        /**
+         * @return Ordered continuous query entries.
+         */
+        public Collection<CacheContinuousQueryEntry> entries() {
+            List<CacheContinuousQueryEntry> entries = new ArrayList<>();
+
+            synchronized (pendingEnts) {
+                if (pendingEnts.isEmpty())
+                    return Collections.emptyList();
+
+                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+
+                Map.Entry<Long, CacheContinuousQueryEntry> prev = null;
+
+                Set<Long> rmvEnts = new HashSet<>();
+
+                while (iter.hasNext()) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    // The elements are consistently.
+                    if (e.getKey() == lastFiredEvt + 1) {
+                        ++lastFiredEvt;
+
+                        entries.add(e.getValue());
+
+                        iter.remove();
+                    }
+                    // Handle hole in sequence.
+                    else if (prev != null && prev.getKey() + 1 == e.getKey()) {
+                        entries.add(prev.getValue());
+
+                        lastFiredEvt = prev.getKey();
+
+                        rmvEnts.add(prev.getKey());
+
+                        if (!iter.hasNext()) {
+                            entries.add(e.getValue());
+
+                            lastFiredEvt = e.getKey();
+
+                            rmvEnts.add(e.getKey());
+                        }
+                    }
+                    else if (prev != null)
+                        break;
+
+                    prev = e;
+                }
+
+                for (Long rmKey : rmvEnts)
+                    pendingEnts.remove(rmKey);
+            }
+
+            return entries;
+        }
+
+        /**
+         * Reset internal state.
+         */
+        public void reset() {
+            synchronized (pendingEnts) {
+                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+
+                while (iter.hasNext()) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    if (e.getKey() >= lastFiredEvt)
+                        iter.remove();
+                }
+
+                lastFiredEvt = 0;
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;
@@ -546,6 +735,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
     }
 
+    /** {@inheritDoc} */
+    @Override public void partitionLost(String cacheName, int partId) {
+        if (this.cacheName == null) {
+            int z = 0;
+
+            ++z;
+        }
+
+        if ((this.cacheName == null && cacheName == null) // Check default caches.
+            || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) {
+            PartitionRecovery rcv = rcvs.get(partId);
+
+            if (rcv != null)
+                rcv.reset();
+        }
+    }
+
     /**
      * @param t Acknowledge information.
      * @param routineId Routine ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 2f9e111..735e808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -55,6 +55,14 @@ interface CacheContinuousQueryListener<K, V> {
     public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
 
     /**
+     * Fire event that partition lost.
+     *
+     * @param cacheName Cache name.
+     * @param partId Partition ID.
+     */
+    public void partitionLost(String cacheName, int partId);
+
+    /**
      * Flushes backup queue.
      *
      * @param ctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
new file mode 100644
index 0000000..734d072
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import javax.cache.event.EventType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query entry.
+ */
+public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache name. */
+    private int cacheId;
+
+    /** Partition. */
+    private int part;
+
+    /**
+     * Required by {@link Message}.
+     */
+    public CacheContinuousQueryLostPartition() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param part Partition ID.
+     */
+    CacheContinuousQueryLostPartition(int cacheId, int part) {
+        this.cacheId = cacheId;
+        this.part = part;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    int cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 116;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cacheId", cacheId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheId = reader.readInt("cacheId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(CacheContinuousQueryLostPartition.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryLostPartition.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c9fb656..35c6696 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -46,6 +46,10 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -65,6 +69,7 @@ import static javax.cache.event.EventType.EXPIRED;
 import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 
 /**
@@ -132,6 +137,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                     lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
             }
         }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+
+        cctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt instanceof CacheRebalancingEvent;
+
+                CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt;
+
+                for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                    lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+
+                for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+                    lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+            }
+        }, EVT_CACHE_REBALANCE_PART_DATA_LOST);
     }
 
     /** {@inheritDoc} */
@@ -666,7 +685,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create();
 
                 if (!(fltr instanceof Serializable))
-                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr);
+                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
+                        + fltr);
             }
 
             CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
index 2fef161..67b8c82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -1,7 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.continuous;
 
+import java.util.Collection;
+
 /**
- * Created by Nikolay on 02.09.2015.
+ * Continuous routine batch.
  */
 public interface GridContinuousBatch {
+    /**
+     * Adds element to this batch.
+     *
+     * @param obj Element to add.
+     */
+    public void add(Object obj);
+
+    /**
+     * Collects elements that are currently in this batch.
+     *
+     * @return Elements in this batch.
+     */
+    public Collection<Object> collect();
+
+    /**
+     * @return Current batch size.
+     */
+    public int size();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
index 8e29e29..4540de1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -1,7 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.continuous;
 
+import java.util.Collection;
+import org.jsr166.ConcurrentLinkedDeque8;
+
 /**
- * Created by Nikolay on 02.09.2015.
+ * Continuous routine batch adapter.
  */
-public class GridContinuousBatchAdapter {
+public class GridContinuousBatchAdapter implements GridContinuousBatch {
+    /** Buffer. */
+    private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+
+    /** {@inheritDoc} */
+    @Override public void add(Object obj) {
+        assert obj != null;
+
+        buf.add(obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> collect() {
+        return buf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return buf.sizex();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 30e596a..975cd2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -98,6 +98,28 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException;
 
     /**
+     * Creates new batch.
+     *
+     * @return New batch.
+     */
+    public GridContinuousBatch createBatch();
+
+    /**
+     * Called when ack for a batch is received from client.
+     *
+     * @param routineId Routine ID.
+     * @param batch Acknowledged batch.
+     * @param ctx Kernal context.
+     */
+    public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
+
+    /**
+     * @param cacheName Cache name.
+     * @param partId Partition ID.
+     */
+    public void partitionLost(String cacheName, int partId);
+
+    /**
      * @return Topic for ordered notifications. If {@code null}, notifications
      * will be sent in non-ordered messages.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 15c9dd2..c7676d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,7 +35,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
@@ -57,12 +59,14 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
@@ -72,6 +76,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 1fef4d5..26d1f91 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -507,7 +507,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean conflictResolve,
         boolean intercept,
         UUID subjId,
-        String taskName) throws IgniteCheckedException,
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index ed856a5..3bba5e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -25,23 +25,28 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -59,12 +64,17 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -254,6 +264,302 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
+    public void testLeftPrimaryAndBackupNodes() throws Exception {
+        this.backups = 1;
+
+        final int SRV_NODES = 3;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        final Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        final TestLocalListener lsnr = new TestLocalListener();
+
+        qry.setLocalListener(lsnr);
+
+        qry.setRemoteFilter(lsnr);
+
+        IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+        QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+        Ignite igniteSrv = ignite(0);
+
+        IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+        Affinity<Object> aff = affinity(srvCache);
+
+        List<Integer> keys = testKeys(srvCache, 1);
+
+        Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0));
+
+        Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() {
+            @Override public UUID apply(ClusterNode node) {
+                return node.id();
+            }
+        });
+
+        int keyIter = 0;
+
+        boolean filtered = false;
+
+        Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+        final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+        for (; keyIter < keys.size() / 2; keyIter++) {
+            int key = keys.get(keyIter);
+
+            log.info("Put [key=" + key + ", part=" + aff.partition(key)
+                + ", filtered=" + filtered + ']');
+
+            T2<Object, Object> t = updates.get(key);
+
+            Integer val = filtered ?
+                (key % 2 == 0 ? key + 1 : key) :
+                key * 2;
+
+            if (t == null) {
+                updates.put(key, new T2<>((Object)val, null));
+
+                if (!filtered)
+                    expEvts.add(new T3<>((Object)key, (Object)val, null));
+            }
+            else {
+                updates.put(key, new T2<>((Object)val, (Object)key));
+
+                if (!filtered)
+                    expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+            }
+
+            srvCache.put(key, val);
+
+            filtered = !filtered;
+        }
+
+        checkEvents(expEvts, lsnr);
+
+        List<Thread> stopThreads = new ArrayList<>(3);
+
+        // Stop nodes which owning this partition.
+        for (int i = 0; i < SRV_NODES; i++) {
+            Ignite ignite = ignite(i);
+
+            if (ids.contains(ignite.cluster().localNode().id())) {
+                final int i0 = i;
+
+                TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+                spi.skipAllMsg = true;
+
+                stopThreads.add(new Thread() {
+                    @Override public void run() {
+                        stopGrid(i0, true);
+                    }
+                });
+            }
+        }
+
+        // Stop and join threads.
+        for (Thread t : stopThreads)
+            t.start();
+
+        for (Thread t : stopThreads)
+            t.join();
+
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                // (SRV_NODES + 1 client node) - 1 primary - backup nodes.
+                return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
+                    - 1 /** Primary node */ - backups;
+            }
+        }, 10000L);
+
+        for (; keyIter < keys.size(); keyIter++) {
+            int key = keys.get(keyIter);
+
+            log.info("Put [key=" + key + ", filtered=" + filtered + ']');
+
+            T2<Object, Object> t = updates.get(key);
+
+            Integer val = filtered ?
+                (key % 2 == 0 ? key + 1 : key) :
+                key * 2;
+
+            if (t == null) {
+                updates.put(key, new T2<>((Object)val, null));
+
+                if (!filtered)
+                    expEvts.add(new T3<>((Object)key, (Object)val, null));
+            }
+            else {
+                updates.put(key, new T2<>((Object)val, (Object)key));
+
+                if (!filtered)
+                    expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+            }
+
+            clnCache.put(key, val);
+
+            filtered = !filtered;
+        }
+
+        checkEvents(expEvts, lsnr);
+
+        query.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilter() throws Exception {
+        this.backups = 2;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        if (cacheMode() != REPLICATED)
+            assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+
+        Affinity<Object> aff = qryClient.affinity(null);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        final TestLocalListener lsnr = new TestLocalListener();
+
+        qry.setLocalListener(lsnr);
+
+        qry.setRemoteFilter(lsnr);
+
+        int PARTS = 10;
+
+        QueryCursor<?> cur = qryClientCache.query(qry);
+
+        Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+        final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+        for (int i = 0; i < SRV_NODES - 1; i++) {
+            log.info("Stop iteration: " + i);
+
+            TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+            Ignite ignite = ignite(i);
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            List<Integer> keys = testKeys(cache, PARTS);
+
+            boolean first = true;
+
+            boolean filtered = false;
+
+            for (Integer key : keys) {
+                log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key)
+                    + ", filtered=" + filtered + ']');
+
+                T2<Object, Object> t = updates.get(key);
+
+                Integer val = filtered ?
+                    (key % 2 == 0 ? key + 1 : key) :
+                    key * 2;
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)val, null));
+
+                    if (!filtered)
+                        expEvts.add(new T3<>((Object)key, (Object)val, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)val, (Object)key));
+
+                    if (!filtered)
+                        expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+                }
+
+                cache.put(key, val);
+
+                if (first) {
+                    spi.skipMsg = true;
+
+                    first = false;
+                }
+
+                filtered = !filtered;
+            }
+
+            stopGrid(i);
+
+            boolean check = GridTestUtils.waitForCondition(new PAX() {
+                @Override public boolean applyx() throws IgniteCheckedException {
+                    return expEvts.size() == lsnr.keys.size();
+                }
+            }, 5000L);
+
+            if (!check) {
+                Set<Integer> keys0 = new HashSet<>(keys);
+
+                keys0.removeAll(lsnr.keys);
+
+                log.info("Missed events for keys: " + keys0);
+
+                fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']');
+            }
+
+            checkEvents(expEvts, lsnr);
+        }
+
+        cur.close();
+    }
+
+    /**
+     *
+     */
+    public static class TestLocalListener implements CacheEntryUpdatedListener<Object, Object>,
+        CacheEntryEventSerializableFilter<Object, Object> {
+        /** Keys. */
+        GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+        /** Events. */
+        private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
+            for (CacheEntryEvent<?, ?> e : events) {
+                System.err.println("Update entry: " + e);
+
+                Integer key = (Integer)e.getKey();
+
+                keys.add(key);
+
+                evts.put(key, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
+            return (Integer)e.getValue() % 2 == 0;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testThreeBackups() throws Exception {
         if (cacheMode() == REPLICATED)
             return;
@@ -261,6 +567,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         checkBackupQueue(3, false);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isDebug() {
+        return true;
+    }
+
     /**
      * @param backups Number of backups.
      * @param updateFromClient If {@code true} executes cache update from client node.
@@ -423,7 +734,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             assertNotNull("No event for key: " + exp.get1(), e);
             assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
-            assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue());
         }
 
         expEvts.clear();
@@ -432,6 +742,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     }
 
     /**
+     * @param expEvts Expected events.
+     * @param lsnr Listener.
+     */
+    private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener lsnr) {
+        assert lsnr.evts.size() == expEvts.size();
+
+        for (T3<Object, Object, Object> exp : expEvts) {
+            CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+            assertNotNull("No event for key: " + exp.get1(), e);
+            assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+        }
+
+        expEvts.clear();
+
+        lsnr.evts.clear();
+        lsnr.keys.clear();
+    }
+
+    /**
      * @param cache Cache.
      * @param parts Number of partitions.
      * @return Keys.
@@ -447,7 +777,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         int[] nodeParts = aff.primaryPartitions(node);
 
-        final int KEYS_PER_PART = 3;
+        final int KEYS_PER_PART = 50;
 
         for (int i = 0; i < parts; i++) {
             int part = nodeParts[i];
@@ -919,7 +1249,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                         assertEquals(key, rcvdEvt.getKey());
                         assertEquals(expEvt.get1(), rcvdEvt.getValue());
-                        assertEquals(expEvt.get2(), rcvdEvt.getOldValue());
                     }
                 }
             }
@@ -1012,7 +1341,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+        @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
             throws CacheEntryListenerException  {
             try {
                 for (CacheEntryEvent<?, ?> evt : evts) {
@@ -1026,18 +1355,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     boolean dup = false;
 
-                    if (prevVal != null) {
-                        if (prevVal.equals(val)) // Can get this event with automatic put retry.
-                            dup = true;
-                        else {
-                            assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
-                            assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
-                        }
-                    }
-                    else {
-                        assertEquals("Unexpected event: " + evt, (Object)0, val);
-                        assertNull("Unexpected event: " + evt, evt.getOldValue());
-                    }
+                    if (prevVal != null && prevVal.equals(val))
+                        dup = true;
 
                     if (!dup) {
                         vals.put(key, val);
@@ -1074,6 +1393,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private volatile boolean skipMsg;
 
         /** */
+        private volatile boolean skipAllMsg;
+
+        /** */
         private volatile AtomicBoolean sndFirstOnly;
 
         /** {@inheritDoc} */
@@ -1081,6 +1403,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             throws IgniteSpiException {
             Object msg0 = ((GridIoMessage)msg).message();
 
+            if (skipAllMsg)
+                return;
+
             if (msg0 instanceof GridContinuousMessage) {
                 if (skipMsg) {
                     log.info("Skip continuous message: " + msg0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01dfcbb6/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..503b992 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1232,7 +1232,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         if (isDebug()) {
             discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);
-            cfg.setNetworkTimeout(Long.MAX_VALUE);
+            cfg.setNetworkTimeout(Long.MAX_VALUE / 3);
         }
         else {
             // Set network timeout to 10 sec to avoid unexpected p2p class loading errors.