You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/20 14:33:40 UTC

[18/24] ignite git commit: IGNITE-426 Implemented failover for Continuous query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c7bf091..b2e7490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -46,12 +47,14 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -82,6 +85,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** */
     private static final byte EXPIRED_FLAG = 0b1000;
 
+    /** */
+    private static final long BACKUP_ACK_FREQ = 5000;
+
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
 
@@ -108,6 +114,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         // Append cache name to the topic.
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
+
+        cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+            new CI2<UUID, CacheContinuousQueryBatchAck>() {
+                @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+                    CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+                    if (lsnr != null)
+                        lsnr.cleanupBackupQueue(msg.updateCntrs());
+                }
+            });
+
+        cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
     }
 
     /** {@inheritDoc} */
@@ -137,25 +155,55 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * @param e Cache entry.
+     * @param partId Partition id.
+     * @param updCntr Updated counter.
+     * @param topVer Topology version.
+     */
+    public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+        if (lsnrCnt.get() > 0) {
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                UPDATED,
+                key,
+                null,
+                null,
+                partId,
+                updCntr,
+                topVer);
+
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                    cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+            for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                lsnr.skipUpdateEvent(evt, topVer);
+        }
+    }
+
+    /**
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param internal Internal entry (internal key or not user cache),
+     * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
+     * @param updateCntr Update counter.
+     * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryUpdated(GridCacheEntryEx e,
+    public void onEntryUpdated(
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
-        boolean preload)
+        boolean internal,
+        int partId,
+        boolean primary,
+        boolean preload,
+        long updateCntr,
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
-        assert e != null;
         assert key != null;
 
-        boolean internal = e.isInternal() || !e.context().userCache();
-
         if (preload && !internal)
             return;
 
@@ -179,8 +227,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean initialized = false;
 
-        boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
-        boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
@@ -205,7 +252,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 evtType,
                 key,
                 newVal,
-                lsnr.oldValueRequired() ? oldVal : null);
+                lsnr.oldValueRequired() ? oldVal : null,
+                partId,
+                updateCntr,
+                topVer);
 
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -250,12 +300,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                     initialized = true;
                 }
 
-               CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                   cctx.cacheId(),
-                   EXPIRED,
-                   key,
-                   null,
-                   lsnr.oldValueRequired() ? oldVal : null);
+                CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                    cctx.cacheId(),
+                    EXPIRED,
+                    key,
+                    null,
+                    lsnr.oldValueRequired() ? oldVal : null,
+                    e.partition(),
+                    -1,
+                    null);
 
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -373,6 +426,27 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public void beforeExchange(AffinityTopologyVersion topVer) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+    }
+
+    /**
+     * Partition evicted callback.
+     *
+     * @param part Partition number.
+     */
+    public void onPartitionEvicted(int part) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            lsnr.onPartitionEvicted(part);
+
+        for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+            lsnr.onPartitionEvicted(part);
+    }
+
+    /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param bufSize Buffer size.
@@ -417,7 +491,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             sync,
             ignoreExpired,
             taskNameHash,
-            skipPrimaryCheck);
+            skipPrimaryCheck,
+            cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -469,10 +544,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
                                 GridCacheEntryEx e = it.next();
 
+                                CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+                                    cctx.cacheId(),
+                                    CREATED,
+                                    e.key(),
+                                    e.rawGet(),
+                                    null,
+                                    0,
+                                    -1,
+                                    null);
+
                                 next = new CacheContinuousQueryEvent<>(
                                     cctx.kernalContext().cache().jcache(cctx.name()),
-                                    cctx,
-                                    new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
+                                    cctx, entry);
 
                                 if (rmtFilter != null && !rmtFilter.evaluate(next))
                                     next = null;
@@ -590,10 +674,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheEntryEventFilter fltr = null;
 
             if (cfg.getCacheEntryEventFilterFactory() != null) {
-                fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create();
+                fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
 
                 if (!(fltr instanceof Serializable))
-                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr);
+                    throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
+                        + fltr);
             }
 
             CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
@@ -637,6 +722,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         /**
          * @param impl Listener.
+         * @param log Logger.
          */
         JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
             assert impl != null;
@@ -789,4 +875,29 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             }
         }
     }
+
+    /**
+     * Task flash backup queue.
+     */
+    private static final class BackupCleaner implements Runnable {
+        /** Listeners. */
+        private final Map<UUID, CacheContinuousQueryListener> lsnrs;
+
+        /** Context. */
+        private final GridKernalContext ctx;
+
+        /**
+         * @param lsnrs Listeners.
+         */
+        public BackupCleaner(Map<UUID, CacheContinuousQueryListener> lsnrs, GridKernalContext ctx) {
+            this.lsnrs = lsnrs;
+            this.ctx = ctx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                lsnr.acknowledgeBackupOnTimeout(ctx);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 23f83be..ff413d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -181,6 +181,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      */
     private byte flags;
 
+    /** Partition update counter. */
+    @GridDirectTransient
+    private long partUpdateCntr;
+
     /** */
     private GridCacheVersion serReadVer;
 
@@ -373,6 +377,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * Sets partition counter.
+     *
+     * @param partCntr Partition counter.
+     */
+    public void updateCounter(long partCntr) {
+        this.partUpdateCntr = partCntr;
+    }
+
+    /**
+     * @return Partition index.
+     */
+    public long updateCounter() {
+        return partUpdateCntr;
+    }
+
+    /**
      * @param val Value to set.
      */
     void setAndMarkValid(CacheObject val) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a9846ef..63a4cbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -964,6 +964,9 @@ public class IgniteTxHandler {
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
 
+                tx.setPartitionUpdateCounters(
+                    req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null);
+
                 tx.commit();
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ecb0595..cff62d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1012,7 +1012,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
                                             resolveTaskName(),
-                                            dhtVer);
+                                            dhtVer,
+                                            null);
+
+                                        if (updRes.success())
+                                            txEntry.updateCounter(updRes.updatePartitionCounter());
 
                                         if (nearCached != null && updRes.success()) {
                                             nearCached.innerSet(
@@ -1032,7 +1036,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 null,
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
-                                                dhtVer);
+                                                dhtVer,
+                                                null);
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -1049,7 +1054,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
                                             resolveTaskName(),
-                                            dhtVer);
+                                            dhtVer,
+                                            null);
+
+                                        if (updRes.success())
+                                            txEntry.updateCounter(updRes.updatePartitionCounter());
 
                                         if (nearCached != null && updRes.success()) {
                                             nearCached.innerRemove(
@@ -1065,7 +1074,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 null,
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
-                                                dhtVer);
+                                                dhtVer,
+                                                null);
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index b80909f..8ceca3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -34,4 +34,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers,
         Collection<GridCacheVersion> pendingVers);
-}
\ No newline at end of file
+
+    /**
+     * @param cntrs Partition update indexes.
+     */
+    public void setPartitionUpdateCounters(long[] cntrs);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
new file mode 100644
index 0000000..67b8c82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.Collection;
+
+/**
+ * Continuous routine batch.
+ */
+public interface GridContinuousBatch {
+    /**
+     * Adds element to this batch.
+     *
+     * @param obj Element to add.
+     */
+    public void add(Object obj);
+
+    /**
+     * Collects elements that are currently in this batch.
+     *
+     * @return Elements in this batch.
+     */
+    public Collection<Object> collect();
+
+    /**
+     * @return Current batch size.
+     */
+    public int size();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
new file mode 100644
index 0000000..4540de1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.Collection;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ * Continuous routine batch adapter.
+ */
+public class GridContinuousBatchAdapter implements GridContinuousBatch {
+    /** Buffer. */
+    private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+
+    /** {@inheritDoc} */
+    @Override public void add(Object obj) {
+        assert obj != null;
+
+        buf.add(obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> collect() {
+        return buf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return buf.sizex();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 30e596a..d8698b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous;
 
 import java.io.Externalizable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
@@ -98,6 +99,22 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException;
 
     /**
+     * Creates new batch.
+     *
+     * @return New batch.
+     */
+    public GridContinuousBatch createBatch();
+
+    /**
+     * Called when ack for a batch is received from client.
+     *
+     * @param routineId Routine ID.
+     * @param batch Acknowledged batch.
+     * @param ctx Kernal context.
+     */
+    public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
+
+    /**
      * @return Topic for ordered notifications. If {@code null}, notifications
      * will be sent in non-ordered messages.
      */
@@ -129,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
      * @return Cache name if this is a continuous query handler.
      */
     public String cacheName();
+
+    /**
+     * @param cntrs Init state for partition counters.
+     */
+    public void updateCounters(Map<Integer, Long> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d1cb3a9..c07cc13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -52,17 +53,21 @@ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
@@ -70,7 +75,6 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -203,8 +207,38 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     StartFuture fut = startFuts.remove(msg.routineId());
 
                     if (fut != null) {
-                        if (msg.errs().isEmpty())
+                        if (msg.errs().isEmpty()) {
+                            LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+                            if (routine != null) {
+                                try {
+                                    Map<Integer, Long> cntrs = msg.updateCounters();
+
+                                    GridCacheAdapter<Object, Object> interCache =
+                                        ctx.cache().internalCache(routine.handler().cacheName());
+
+                                    if (interCache != null && cntrs != null && interCache.context() != null
+                                        && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
+                                        Map<Integer, Long> map = interCache.context().topology().updateCounters();
+
+                                        for (Map.Entry<Integer, Long> e : map.entrySet()) {
+                                            Long cntr0 = cntrs.get(e.getKey());
+                                            Long cntr1 = e.getValue();
+
+                                            if (cntr0 == null || cntr1 > cntr0)
+                                                cntrs.put(e.getKey(), cntr1);
+                                        }
+                                    }
+                                }
+                                catch (Exception e) {
+                                    U.warn(log, "Failed to load update counters.", e);
+                                }
+
+                                routine.handler().updateCounters(msg.updateCounters());
+                            }
+
                             fut.onRemoteRegistered();
+                        }
                         else {
                             IgniteCheckedException firstEx = F.first(msg.errs().values());
 
@@ -651,6 +685,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /**
      * @param nodeId ID of the node that started routine.
      * @param routineId Routine ID.
+     * @param objs Notification objects.
+     * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void addBackupNotification(UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        @Nullable Object orderedTopic)
+        throws IgniteCheckedException {
+        if (processorStopped)
+            return;
+
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+        if (info != null) {
+            final GridContinuousBatch batch = info.addAll(objs);
+
+            sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null);
+        }
+    }
+
+    /**
+     * @param nodeId ID of the node that started routine.
+     * @param routineId Routine ID.
      * @param obj Notification object.
      * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
      * @param sync If {@code true} then waits for event acknowledgment.
@@ -658,7 +716,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void addNotification(UUID nodeId,
-        UUID routineId,
+        final UUID routineId,
         @Nullable Object obj,
         @Nullable Object orderedTopic,
         boolean sync,
@@ -673,7 +731,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         if (processorStopped)
             return;
 
-        RemoteRoutineInfo info = rmtInfos.get(routineId);
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
 
         if (info != null) {
             assert info.interval == 0 || !sync;
@@ -686,7 +744,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -697,10 +755,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 fut.get();
             }
             else {
-                Collection<Object> toSnd = info.add(obj);
+                final GridContinuousBatch batch = info.add(obj);
+
+                if (batch != null) {
+                    CI1<IgniteException> ackC = new CI1<IgniteException>() {
+                        @Override public void apply(IgniteException e) {
+                            if (e == null)
+                                info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+                        }
+                    };
 
-                if (toSnd != null)
-                    sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+                    sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC);
+                }
             }
         }
     }
@@ -725,6 +791,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
      * @param msg If {@code true} then sent data is collection of messages.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendNotification(UUID nodeId,
@@ -732,7 +799,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @Nullable IgniteUuid futId,
         Collection<Object> toSnd,
         @Nullable Object orderedTopic,
-        boolean msg) throws IgniteCheckedException {
+        boolean msg,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert toSnd != null;
@@ -740,7 +808,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         sendWithRetries(nodeId,
             new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
-            orderedTopic);
+            orderedTopic,
+            ackC);
     }
 
     /**
@@ -819,6 +888,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         }
 
+        try {
+            if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
+                Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
+                    .context().topology().updateCounters();
+
+                req.addUpdateCounters(cntrs);
+            }
+        }
+        catch (Exception e) {
+            U.warn(log, "Failed to load partition counters.");
+        }
+
         if (err != null)
             req.addError(ctx.localNodeId(), err);
 
@@ -859,6 +940,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 try {
                     sendWithRetries(nodeId,
                         new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
+                        null,
                         null);
                 }
                 catch (IgniteCheckedException e) {
@@ -922,15 +1004,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 break;
                             }
 
-                            IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval();
+                            IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval();
 
-                            Collection<Object> toSnd = t.get1();
+                            final GridContinuousBatch batch = t.get1();
 
-                            if (toSnd != null && !toSnd.isEmpty()) {
+                            if (batch != null && batch.size() > 0) {
                                 try {
+                                    Collection<Object> toSnd = batch.collect();
+
                                     boolean msg = toSnd.iterator().next() instanceof Message;
 
-                                    sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
+                                    CI1<IgniteException> ackC = new CI1<IgniteException>() {
+                                        @Override public void apply(IgniteException e) {
+                                            if (e == null)
+                                                info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+                                        }
+                                    };
+
+                                    sendNotification(nodeId,
+                                        routineId,
+                                        null,
+                                        toSnd,
+                                        hnd.orderedTopic(),
+                                        msg,
+                                        ackC);
                                 }
                                 catch (ClusterTopologyCheckedException ignored) {
                                     if (log.isDebugEnabled())
@@ -1013,9 +1110,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic)
+    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
         assert nodeId != null;
         assert msg != null;
@@ -1023,7 +1122,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node != null)
-            sendWithRetries(node, msg, orderedTopic);
+            sendWithRetries(node, msg, orderedTopic, ackC);
         else
             throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId);
     }
@@ -1033,14 +1132,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic)
-        throws IgniteCheckedException {
+    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert node != null;
         assert msg != null;
 
-        sendWithRetries(F.asList(node), msg, orderedTopic);
+        sendWithRetries(F.asList(node), msg, orderedTopic, ackC);
     }
 
     /**
@@ -1048,10 +1148,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg,
-        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert !F.isEmpty(nodes);
         assert msg != null;
 
@@ -1074,10 +1175,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                             msg,
                             SYSTEM_POOL,
                             0,
-                            true);
+                            true,
+                            ackC);
                     }
                     else
-                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
 
                     break;
                 }
@@ -1178,8 +1280,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         /** Lock. */
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-        /** Buffer. */
-        private ConcurrentLinkedDeque8<Object> buf;
+        /** Batch. */
+        private GridContinuousBatch batch;
 
         /** Last send time. */
         private long lastSndTime = U.currentTimeMillis();
@@ -1210,7 +1312,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             this.interval = interval;
             this.autoUnsubscribe = autoUnsubscribe;
 
-            buf = new ConcurrentLinkedDeque8<>();
+            batch = hnd.createBatch();
         }
 
         /**
@@ -1238,21 +1340,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * @param objs Objects to add.
+         * @return Batch to send.
+         */
+        GridContinuousBatch addAll(Collection<?> objs) {
+            assert objs != null;
+            assert objs.size() > 0;
+
+            GridContinuousBatch toSnd = null;
+
+            lock.writeLock().lock();
+
+            try {
+                for (Object obj : objs)
+                    batch.add(obj);
+
+                toSnd = batch;
+
+                batch = hnd.createBatch();
+
+                if (interval > 0)
+                    lastSndTime = U.currentTimeMillis();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+
+            return toSnd;
+        }
+
+        /**
          * @param obj Object to add.
-         * @return Object to send or {@code null} if there is nothing to send for now.
+         * @return Batch to send or {@code null} if there is nothing to send for now.
          */
-        @Nullable Collection<Object> add(@Nullable Object obj) {
-            ConcurrentLinkedDeque8 buf0 = null;
+        @Nullable GridContinuousBatch add(Object obj) {
+            assert obj != null;
+
+            GridContinuousBatch toSnd = null;
 
-            if (buf.sizex() >= bufSize - 1) {
+            if (batch.size() >= bufSize - 1) {
                 lock.writeLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
 
-                    buf0 = buf;
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     if (interval > 0)
                         lastSndTime = U.currentTimeMillis();
@@ -1265,34 +1399,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 lock.readLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
                 }
                 finally {
                     lock.readLock().unlock();
                 }
             }
 
-            Collection<Object> toSnd = null;
-
-            if (buf0 != null) {
-                toSnd = new ArrayList<>(buf0.sizex());
-
-                for (Object o : buf0)
-                    toSnd.add(o);
-            }
-
             return toSnd;
         }
 
         /**
-         * @return Tuple with objects to sleep (or {@code null} if there is nothing to
+         * @return Tuple with batch to send (or {@code null} if there is nothing to
          *      send for now) and time interval after next check is needed.
          */
         @SuppressWarnings("TooBroadScope")
-        IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+        IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
             assert interval > 0;
 
-            Collection<Object> toSnd = null;
+            GridContinuousBatch toSnd = null;
             long diff;
 
             long now = U.currentTimeMillis();
@@ -1302,10 +1427,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             try {
                 diff = now - lastSndTime;
 
-                if (diff >= interval && !buf.isEmpty()) {
-                    toSnd = buf;
+                if (diff >= interval && batch.size() > 0) {
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     lastSndTime = now;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index bd4aae3..9644372 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private final Map<UUID, IgniteCheckedException> errs;
 
+    /** */
+    private final Map<Integer, Long> updateCntrs;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> cntrs) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
+        this.updateCntrs = cntrs;
     }
 
     /** {@inheritDoc} */
@@ -51,6 +56,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @return Update counters for partitions.
+     */
+    public Map<Integer, Long> updateCounters() {
+        return updateCntrs;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 892adac..82c0377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
 
+    /** */
+    private Map<Integer, Long> updateCntrs;
+
     /**
      * @param routineId Routine id.
      * @param startReqData Start request data.
@@ -63,6 +66,22 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @param cntrs Update counters.
+     */
+    public void addUpdateCounters(Map<Integer, Long> cntrs) {
+        if (updateCntrs == null)
+            updateCntrs = new HashMap<>();
+
+        for (Map.Entry<Integer, Long> e : cntrs.entrySet()) {
+            Long cntr0 = updateCntrs.get(e.getKey());
+            Long cntr1 = e.getValue();
+
+            if (cntr0 == null || cntr1 > cntr0)
+                updateCntrs.put(e.getKey(), cntr1);
+        }
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -76,7 +95,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index b93acf5..97696bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -479,7 +479,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         @Nullable GridCacheVersion drVer,
         UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer)
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
     }
@@ -529,7 +530,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean conflictResolve,
         boolean intercept,
         UUID subjId,
-        String taskName) throws IgniteCheckedException,
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 
@@ -550,7 +553,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         @Nullable GridCacheVersion drVer,
         UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr
         ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;