You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/11/21 03:01:55 UTC
[17/50] [abbrv] ignite git commit: IGNITE-426 Implemented failover
for Continuous query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1219f2f..72a60d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
+ /** Previous values. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> prevVals;
+
/** Conflict versions. */
@GridDirectCollection(GridCacheVersion.class)
private List<GridCacheVersion> conflictVers;
@@ -139,10 +144,19 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Task name hash. */
private int taskNameHash;
+ /** Partition. */
+ private GridLongList updateCntrs;
+
/** On response flag. Access should be synced on future. */
@GridDirectTransient
private boolean onRes;
+ @GridDirectTransient
+ private List<Integer> partIds;
+
+ @GridDirectTransient
+ private List<CacheObject> localPrevVals;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -193,6 +207,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
this.addDepInfo = addDepInfo;
keys = new ArrayList<>();
+ partIds = new ArrayList<>();
+ localPrevVals = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
@@ -216,15 +232,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} adds previous value.
+ * @param prevVal Previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ int partId,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx) {
keys.add(key);
+ partIds.add(partId);
+
+ localPrevVals.add(prevVal);
+
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -233,6 +259,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
else
vals.add(val);
+ if (addPrevVal) {
+ if (prevVals == null)
+ prevVals = new ArrayList<>();
+
+ prevVals.add(prevVal);
+ }
+
+ if (updateIdx != null) {
+ if (updateCntrs == null)
+ updateCntrs = new GridLongList();
+
+ updateCntrs.add(updateIdx);
+ }
+
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
if (conflictVers == null) {
@@ -283,8 +323,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
- long expireTime)
- {
+ long expireTime) {
if (nearKeys == null) {
nearKeys = new ArrayList<>();
@@ -415,6 +454,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/**
+ * @param idx Partition index.
+ * @return Partition id.
+ */
+ public int partitionId(int idx) {
+ return partIds.get(idx);
+ }
+
+ /**
+ * @param updCntr Update counter.
+ * @return Update counter.
+ */
+ public Long updateCounter(int updCntr) {
+ if (updateCntrs != null && updCntr < updateCntrs.size())
+ return updateCntrs.get(updCntr);
+
+ return null;
+ }
+
+ /**
* @param idx Near key index.
* @return Key.
*/
@@ -435,6 +493,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public CacheObject previousValue(int idx) {
+ if (prevVals != null)
+ return prevVals.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public CacheObject localPreviousValue(int idx) {
+ return localPrevVals.get(idx);
+ }
+
+ /**
+ * @param idx Key index.
* @return Entry processor.
*/
@Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -544,8 +621,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return invokeArgs;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -695,42 +771,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
writer.incrementState();
case 16:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 17:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 18:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 19:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 20:
- if (!writer.writeMessage("ttls", ttls))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 21:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("ttls", ttls))
return false;
writer.incrementState();
case 22:
+ if (!writer.writeMessage("updateCntrs", updateCntrs))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -857,7 +945,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 16:
- subjId = reader.readUuid("subjId");
+ prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -865,6 +953,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
case 17:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -876,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 18:
+ case 19:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -884,7 +980,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 19:
+ case 20:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -892,7 +988,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 20:
+ case 21:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -900,7 +996,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 21:
+ case 22:
+ updateCntrs = reader.readMessage("updateCntrs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -908,7 +1012,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
reader.incrementState();
- case 22:
+ case 24:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -928,7 +1032,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2f2944d..43f34c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (updateTop) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
if (top.cacheId() == cacheCtx.cacheId()) {
- cacheCtx.topology().update(exchId, top.partitionMap(true));
+ cacheCtx.topology().update(exchId,
+ top.partitionMap(true),
+ top.updateCounters());
break;
}
@@ -813,6 +815,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -823,6 +827,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (drCacheCtx.isDrEnabled())
drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
+ if (topChanged)
+ cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
// Partition release future is done so we can flush the write-behind store.
cacheCtx.store().forceFlush();
@@ -956,14 +963,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param id ID.
* @throws IgniteCheckedException If failed.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
+ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+ throws IgniteCheckedException {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
clientOnlyExchange,
cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
+ if (!cacheCtx.isLocal()) {
m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+
+ m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ }
}
if (log.isDebugEnabled())
@@ -989,15 +1000,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
- if (ready)
+ if (ready) {
m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ }
}
}
// It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+ m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+ }
+
if (log.isDebugEnabled())
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + m + ']');
@@ -1334,15 +1351,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer cacheId = entry.getKey();
+ Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId);
+
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null)
- cacheCtx.topology().update(exchId, entry.getValue());
+ cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
else {
ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
}
}
}
@@ -1360,7 +1379,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);
- top.update(exchId, entry.getValue());
+ top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index c06d773..3f4f9bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
@@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** */
private byte[] partsBytes;
+ /** Partitions update counters. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partCntrs;
+
+ /** Serialized partitions counters. */
+ private byte[] partCntrsBytes;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -92,13 +101,41 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
parts.put(cacheId, fullMap);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /**
+ * @param cacheId Cache ID.
+ * @param cntrMap Partition update counters.
+ */
+ public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+ if (partCntrs == null)
+ partCntrs = new HashMap<>();
+
+ if (!partCntrs.containsKey(cacheId))
+ partCntrs.put(cacheId, cntrMap);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+ if (partCntrs != null) {
+ Map<Integer, Long> res = partCntrs.get(cacheId);
+
+ return res != null ? res : Collections.<Integer, Long>emptyMap();
+ }
+
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
if (parts != null && partsBytes == null)
partsBytes = ctx.marshaller().marshal(parts);
+
+ if (partCntrs != null)
+ partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
/**
@@ -121,6 +158,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+ if (partCntrsBytes != null)
+ partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
/** {@inheritDoc} */
@@ -139,12 +179,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (writer.state()) {
case 5:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
return false;
writer.incrementState();
case 6:
+ if (!writer.writeByteArray("partsBytes", partsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -167,7 +213,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
switch (reader.state()) {
case 5:
- partsBytes = reader.readByteArray("partsBytes");
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
if (!reader.isLastRead())
return false;
@@ -175,6 +221,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 6:
+ partsBytes = reader.readByteArray("partsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -194,7 +248,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..a2366bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
@@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions. */
private byte[] partsBytes;
+ /** Partitions update counters. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partCntrs;
+
+ /** Serialized partitions counters. */
+ private byte[] partCntrsBytes;
+
/** */
private boolean client;
@@ -90,6 +99,31 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
+ * @param cacheId Cache ID.
+ * @param cntrMap Partition update counters.
+ */
+ public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+ if (partCntrs == null)
+ partCntrs = new HashMap<>();
+
+ partCntrs.put(cacheId, cntrMap);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+ if (partCntrs != null) {
+ Map<Integer, Long> res = partCntrs.get(cacheId);
+
+ return res != null ? res : Collections.<Integer, Long>emptyMap();
+ }
+
+ return Collections.emptyMap();
+ }
+
+ /**
* @return Local partitions.
*/
public Map<Integer, GridDhtPartitionMap> partitions() {
@@ -103,6 +137,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes == null && parts != null)
partsBytes = ctx.marshaller().marshal(parts);
+
+ if (partCntrs != null)
+ partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
/** {@inheritDoc} */
@@ -111,6 +148,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+ if (partCntrsBytes != null)
+ partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
/** {@inheritDoc} */
@@ -135,6 +175,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 6:
+ if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -165,6 +211,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 6:
+ partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -184,7 +238,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1bf03a9..706655b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
- /**expiry policy*/null,
+ /*expiry policy*/null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
@@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
false,
false,
subjId,
- taskName);
+ taskName,
+ null,
+ null);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -361,7 +363,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
false,
intercept,
req.subjectId(),
- taskName);
+ taskName,
+ null,
+ null);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index d078df4..ba58f57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -226,6 +226,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
}
/**
+ * @param cntrs Partition indexes.
+ */
+ @Override public void setPartitionUpdateCounters(long[] cntrs) {
+ // No-op.
+ }
+
+ /**
* Adds owned versions to map.
*
* @param vers Map of owned versions.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
new file mode 100644
index 0000000..7db9026
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Batch acknowledgement.
+ */
+public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Routine ID. */
+ private UUID routineId;
+
+ /** Update counters. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+ private Map<Integer, Long> updateCntrs;
+
+ /**
+ * Default constructor.
+ */
+ public CacheContinuousQueryBatchAck() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param routineId Routine ID.
+ * @param updateCntrs Update counters.
+ */
+ CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateCntrs) {
+ this.cacheId = cacheId;
+ this.routineId = routineId;
+ this.updateCntrs = updateCntrs;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ UUID routineId() {
+ return routineId;
+ }
+
+ /**
+ * @return Update counters.
+ */
+ Map<Integer, Long> updateCntrs() {
+ return updateCntrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeUuid("routineId", routineId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("updateCntrs", updateCntrs, MessageCollectionItemType.INT,
+ MessageCollectionItemType.LONG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ routineId = reader.readUuid("routineId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ updateCntrs = reader.readMap("updateCntrs", MessageCollectionItemType.INT,
+ MessageCollectionItemType.LONG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CacheContinuousQueryBatchAck.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 118;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryBatchAck.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index a4b35eb..0495e6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -22,10 +22,12 @@ import javax.cache.event.EventType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -42,6 +44,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
private static final long serialVersionUID = 0L;
/** */
+ private static final byte BACKUP_ENTRY = 0b0001;
+
+ /** */
+ private static final byte FILTERED_ENTRY = 0b0010;
+
+ /** */
private static final EventType[] EVT_TYPE_VALS = EventType.values();
/**
@@ -75,8 +83,24 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
@GridDirectTransient
private GridDeploymentInfo depInfo;
+ /** Partition. */
+ private int part;
+
+ /** Update counter. */
+ private long updateCntr;
+
+ /** Flags. */
+ private byte flags;
+
+ /** */
+ @GridToStringInclude
+ private AffinityTopologyVersion topVer;
+
+ /** Filtered events. */
+ private GridLongList filteredEvts;
+
/**
- * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
+ * Required by {@link Message}.
*/
public CacheContinuousQueryEntry() {
// No-op.
@@ -88,18 +112,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param part Partition.
+ * @param updateCntr Update partition counter.
+ * @param topVer Topology version if applicable.
*/
CacheContinuousQueryEntry(
int cacheId,
EventType evtType,
KeyCacheObject key,
@Nullable CacheObject newVal,
- @Nullable CacheObject oldVal) {
+ @Nullable CacheObject oldVal,
+ int part,
+ long updateCntr,
+ @Nullable AffinityTopologyVersion topVer) {
this.cacheId = cacheId;
this.evtType = evtType;
this.key = key;
this.newVal = newVal;
this.oldVal = oldVal;
+ this.part = part;
+ this.updateCntr = updateCntr;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Topology version if applicable.
+ */
+ @Nullable AffinityTopologyVersion topologyVersion() {
+ return topVer;
}
/**
@@ -117,6 +157,66 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /**
+ * @return Update counter.
+ */
+ long updateCounter() {
+ return updateCntr;
+ }
+
+ /**
+ * Mark that entry create on backup.
+ */
+ void markBackup() {
+ flags |= BACKUP_ENTRY;
+ }
+
+ /**
+ * Mark that entry filtered.
+ */
+ void markFiltered() {
+ flags |= FILTERED_ENTRY;
+ newVal = null;
+ oldVal = null;
+ key = null;
+ depInfo = null;
+ }
+
+ /**
+ * @return {@code True} if entry sent by backup node.
+ */
+ boolean isBackup() {
+ return (flags & BACKUP_ENTRY) != 0;
+ }
+
+ /**
+ * @return {@code True} if entry was filtered.
+ */
+ boolean isFiltered() {
+ return (flags & FILTERED_ENTRY) != 0;
+ }
+
+ /**
+ * @param cntrs Filtered events.
+ */
+ void filteredEvents(GridLongList cntrs) {
+ filteredEvts = cntrs;
+ }
+
+ /**
+ * @return previous filtered events.
+ */
+ long[] filteredEvents() {
+ return filteredEvts == null ? null : filteredEvts.array();
+ }
+
+ /**
* @param cctx Cache context.
* @throws IgniteCheckedException In case of error.
*/
@@ -138,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @throws IgniteCheckedException In case of error.
*/
void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (!isFiltered()) {
+ key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
- if (newVal != null)
- newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (newVal != null)
+ newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
- if (oldVal != null)
- oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (oldVal != null)
+ oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ }
}
/**
@@ -208,23 +310,53 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
case 2:
- if (!writer.writeMessage("key", key))
+ if (!writer.writeMessage("filteredEvts", filteredEvts))
return false;
writer.incrementState();
case 3:
- if (!writer.writeMessage("newVal", newVal))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 4:
+ if (!writer.writeMessage("key", key))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("newVal", newVal))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("oldVal", oldVal))
return false;
writer.incrementState();
+ case 7:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeLong("updateCntr", updateCntr))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -259,7 +391,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 2:
- key = reader.readMessage("key");
+ filteredEvts = reader.readMessage("filteredEvts");
if (!reader.isLastRead())
return false;
@@ -267,7 +399,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 3:
- newVal = reader.readMessage("newVal");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -275,6 +407,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 4:
+ key = reader.readMessage("key");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ newVal = reader.readMessage("newVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
@@ -282,6 +430,30 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
+ case 7:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ updateCntr = reader.readLong("updateCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -289,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 7417138..a1ebe39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public K getKey() {
+ @Override public K getKey() {
return e.key().value(cctx.cacheObjectContext(), false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e517c70..b69d4cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -21,8 +21,21 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -30,26 +43,37 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -61,6 +85,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int BACKUP_ACK_THRESHOLD = 100;
+
/** Cache name. */
private String cacheName;
@@ -97,9 +124,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
+ /** Backup queue. */
+ private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+ /** */
+ private boolean localCache;
+
+ /** */
+ private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+
+ /** */
+ private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
+
+ /** */
+ private transient AcknowledgeBuffer ackBuf;
+
/** */
private transient int cacheId;
+ /** */
+ private Map<Integer, Long> initUpdCntrs;
+
/**
* Required by {@link Externalizable}.
*/
@@ -121,6 +166,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @param ignoreExpired Ignore expired events flag.
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
* @param taskHash Task name hash code.
+ * @param locCache {@code True} if local cache.
*/
public CacheContinuousQueryHandler(
String cacheName,
@@ -133,7 +179,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
boolean sync,
boolean ignoreExpired,
int taskHash,
- boolean skipPrimaryCheck) {
+ boolean skipPrimaryCheck,
+ boolean locCache) {
assert topic != null;
assert locLsnr != null;
@@ -148,6 +195,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+ this.localCache = locCache;
cacheId = CU.cacheId(cacheName);
}
@@ -173,6 +221,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ this.initUpdCntrs = cntrs;
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
@@ -185,8 +238,32 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (rmtFilter != null)
ctx.resource().injectGeneric(rmtFilter);
+ entryBufs = new ConcurrentHashMap<>();
+
+ backupQueue = new ConcurrentLinkedDeque8<>();
+
+ ackBuf = new AcknowledgeBuffer();
+
+ rcvs = new ConcurrentHashMap<>();
+
final boolean loc = nodeId.equals(ctx.localNodeId());
+ assert !skipPrimaryCheck || loc;
+
+ final GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (!internal && cctx != null && initUpdCntrs != null) {
+ Map<Integer, Long> map = cctx.topology().updateCounters();
+
+ for (Map.Entry<Integer, Long> e : map.entrySet()) {
+ Long cntr0 = initUpdCntrs.get(e.getKey());
+ Long cntr1 = e.getValue();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ initUpdCntrs.put(e.getKey(), cntr1);
+ }
+ }
+
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onExecution() {
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -212,11 +289,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return;
- GridCacheContext<K, V> cctx = cacheContext(ctx);
+ final GridCacheContext<K, V> cctx = cacheContext(ctx);
- if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
+ // Check that cache stopped.
+ if (cctx == null)
return;
+ // skipPrimaryCheck is set only when listen locally for replicated cache events.
+ assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
+
boolean notify = true;
if (rmtFilter != null) {
@@ -228,54 +309,94 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
- if (notify) {
- if (loc)
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- else {
- try {
- if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) {
- evt.entry().prepareMarshal(cctx);
-
- cctx.deploy().prepare(evt.entry());
+ try {
+ final CacheContinuousQueryEntry entry = evt.entry();
+
+ if (!notify)
+ entry.markFiltered();
+
+ if (primary || skipPrimaryCheck) {
+ if (loc) {
+ if (!localCache) {
+ Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry);
+
+ if (!entries.isEmpty()) {
+ final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+ new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+ @Override public CacheEntryEvent<? extends K, ? extends V> apply(
+ CacheContinuousQueryEntry e) {
+ return new CacheContinuousQueryEvent<>(cache, cctx, e);
+ }
+ },
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.isFiltered();
+ }
+ }
+ );
+
+ locLsnr.onUpdated(evts);
+
+ if (!internal && !skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
+ }
+ else {
+ if (!entry.isFiltered())
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
}
- else
- evt.entry().prepareMarshal(cctx);
-
- ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
}
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ else {
+ if (!entry.isFiltered())
+ prepareEntry(cctx, nodeId, entry);
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+ CacheContinuousQueryEntry e = handleEntry(entry);
+
+ if (e != null)
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
}
+ else {
+ if (!internal) {
+ entry.markBackup();
- if (recordIgniteEvt) {
- ctx.event().record(new CacheQueryReadEvent<>(
- ctx.discovery().localNode(),
- "Continuous query executed.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.CONTINUOUS.name(),
- cacheName,
- null,
- null,
- null,
- rmtFilter,
- null,
- nodeId,
- taskName(),
- evt.getKey(),
- evt.getValue(),
- evt.getOldValue(),
- null
- ));
+ backupQueue.add(entry);
+ }
}
}
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+ }
+
+ if (recordIgniteEvt && notify) {
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "Continuous query executed.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.CONTINUOUS.name(),
+ cacheName,
+ null,
+ null,
+ null,
+ rmtFilter,
+ null,
+ nodeId,
+ taskName(),
+ evt.getKey(),
+ evt.getValue(),
+ evt.getOldValue(),
+ null
+ ));
+ }
}
@Override public void onUnregister() {
@@ -283,6 +404,85 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
}
+ @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
+ Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+
+ while (it.hasNext()) {
+ CacheContinuousQueryEntry backupEntry = it.next();
+
+ Long updateCntr = updateCntrs.get(backupEntry.partition());
+
+ if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
+ it.remove();
+ }
+ }
+
+ @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
+ if (backupQueue.isEmpty())
+ return;
+
+ try {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ for (CacheContinuousQueryEntry e : backupQueue) {
+ if (!e.isFiltered())
+ prepareEntry(cctx, nodeId, e);
+ }
+
+ ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+
+ backupQueue.clear();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
+ }
+ }
+
+ @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
+ sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
+ }
+
+ @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
+ try {
+ assert evt != null;
+
+ CacheContinuousQueryEntry e = evt.entry();
+
+ EntryBuffer buf = entryBufs.get(e.partition());
+
+ if (buf == null) {
+ buf = new EntryBuffer();
+
+ EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+
+ if (oldRec != null)
+ buf = oldRec;
+ }
+
+ e = buf.skipEntry(e);
+
+ if (e != null)
+ ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+ }
+ }
+
+ @Override public void onPartitionEvicted(int part) {
+ for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) {
+ if (it.next().partition() == part)
+ it.remove();
+ }
+ }
+
@Override public boolean oldValueRequired() {
return oldValRequired;
}
@@ -304,6 +504,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return mgr.registerListener(routineId, lsnr, internal);
}
+ /**
+ * @param cctx Context.
+ * @param nodeId ID of the node that started routine.
+ * @param entry Entry.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
+ throws IgniteCheckedException {
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
+ entry.prepareMarshal(cctx);
+
+ cctx.deploy().prepare(entry);
+ }
+ else
+ entry.prepareMarshal(cctx);
+ }
+
/** {@inheritDoc} */
@Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
// No-op.
@@ -366,17 +583,377 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
- Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+ Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
+ for (CacheContinuousQueryEntry e : entries)
+ entries0.addAll(handleEvent(ctx, e));
+
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
@Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
return new CacheContinuousQueryEvent<>(cache, cctx, e);
}
+ },
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.isFiltered();
+ }
}
);
locLsnr.onUpdated(evts);
}
+ /**
+ * @param ctx Context.
+ * @param e entry.
+ * @return Entry collection.
+ */
+ private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx,
+ CacheContinuousQueryEntry e) {
+ assert e != null;
+
+ if (internal) {
+ if (e.isFiltered())
+ return Collections.emptyList();
+ else
+ return F.asList(e);
+ }
+
+ // Initial query entry or evicted entry.
+ // This events should be fired immediately.
+ if (e.updateCounter() == -1)
+ return F.asList(e);
+
+ PartitionRecovery rec = rcvs.get(e.partition());
+
+ if (rec == null) {
+ rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(),
+ initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+
+ PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+
+ if (oldRec != null)
+ rec = oldRec;
+ }
+
+ return rec.collectEntries(e);
+ }
+
+ /**
+ * @param e Entry.
+ * @return Entry.
+ */
+ private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+ assert e != null;
+ assert entryBufs != null;
+
+ if (internal) {
+ if (e.isFiltered())
+ return null;
+ else
+ return e;
+ }
+
+ // Initial query entry.
+ // This events should be fired immediately.
+ if (e.updateCounter() == -1)
+ return e;
+
+ EntryBuffer buf = entryBufs.get(e.partition());
+
+ if (buf == null) {
+ buf = new EntryBuffer();
+
+ EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+
+ if (oldRec != null)
+ buf = oldRec;
+ }
+
+ return buf.handle(e);
+ }
+
+ /**
+ *
+ */
+ private static class PartitionRecovery {
+ /** Event which means hole in sequence. */
+ private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+
+ /** */
+ private final static int MAX_BUFF_SIZE = 100;
+
+ /** */
+ private IgniteLogger log;
+
+ /** */
+ private long lastFiredEvt;
+
+ /** */
+ private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
+
+ /** */
+ private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+
+ /**
+ * @param log Logger.
+ * @param topVer Topology version.
+ * @param initCntr Update counters.
+ */
+ public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+ this.log = log;
+
+ if (initCntr != null) {
+ this.lastFiredEvt = initCntr;
+
+ curTop = topVer;
+ }
+ }
+
+ /**
+ * Add continuous entry.
+ *
+ * @param entry Cache continuous query entry.
+ * @return Collection entries which will be fired.
+ */
+ public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
+ assert entry != null;
+
+ List<CacheContinuousQueryEntry> entries;
+
+ synchronized (pendingEvts) {
+ // Received first event.
+ if (curTop == AffinityTopologyVersion.NONE) {
+ lastFiredEvt = entry.updateCounter();
+
+ curTop = entry.topologyVersion();
+
+ return F.asList(entry);
+ }
+
+ if (curTop.compareTo(entry.topologyVersion()) < 0) {
+ if (entry.updateCounter() == 1 && !entry.isBackup()) {
+ entries = new ArrayList<>(pendingEvts.size());
+
+ for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+ if (evt != HOLE && !evt.isFiltered())
+ entries.add(evt);
+ }
+
+ pendingEvts.clear();
+
+ curTop = entry.topologyVersion();
+
+ lastFiredEvt = entry.updateCounter();
+
+ entries.add(entry);
+
+ return entries;
+ }
+
+ curTop = entry.topologyVersion();
+ }
+
+ // Check duplicate.
+ if (entry.updateCounter() > lastFiredEvt) {
+ pendingEvts.put(entry.updateCounter(), entry);
+
+ // Put filtered events.
+ if (entry.filteredEvents() != null) {
+ for (long cnrt : entry.filteredEvents()) {
+ if (cnrt > lastFiredEvt)
+ pendingEvts.put(cnrt, HOLE);
+ }
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skip duplicate continuous query message: " + entry);
+
+ return Collections.emptyList();
+ }
+
+ if (pendingEvts.isEmpty())
+ return Collections.emptyList();
+
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
+
+ entries = new ArrayList<>();
+
+ if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+ for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ if (e.getValue() != HOLE && !e.getValue().isFiltered())
+ entries.add(e.getValue());
+
+ lastFiredEvt = e.getKey();
+
+ iter.remove();
+ }
+ }
+ else {
+ // Elements are consistently.
+ while (iter.hasNext()) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ if (e.getKey() == lastFiredEvt + 1) {
+ ++lastFiredEvt;
+
+ if (e.getValue() != HOLE && !e.getValue().isFiltered())
+ entries.add(e.getValue());
+
+ iter.remove();
+ }
+ else
+ break;
+ }
+ }
+ }
+
+ return entries;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class EntryBuffer {
+ /** */
+ private final static int MAX_BUFF_SIZE = 100;
+
+ /** */
+ private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>();
+
+ /** */
+ private AtomicLong lastFiredCntr = new AtomicLong();
+
+ /**
+ * @param newVal New value.
+ * @return Old value if previous value less than new value otherwise {@code -1}.
+ */
+ private long updateFiredCounter(long newVal) {
+ long prevVal = lastFiredCntr.get();
+
+ while (prevVal < newVal) {
+ if (lastFiredCntr.compareAndSet(prevVal, newVal))
+ return prevVal;
+ else
+ prevVal = lastFiredCntr.get();
+ }
+
+ return prevVal >= newVal ? -1 : prevVal;
+ }
+
+ /**
+ * @param e Entry.
+ * @param topVer Topology version.
+ * @return Continuous query entry.
+ */
+ private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
+ if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
+
+ e.markFiltered();
+
+ return e;
+ }
+ else {
+ buf.add(e.updateCounter());
+
+ // Double check. If another thread sent a event with counter higher than this event.
+ if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
+ buf.remove(e.updateCounter());
+
+ e.markFiltered();
+
+ return e;
+ }
+ else
+ return null;
+ }
+ }
+
+ /**
+ * Add continuous entry.
+ *
+ * @param e Cache continuous query entry.
+ * @return Collection entries which will be fired.
+ */
+ public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
+ assert e != null;
+
+ if (e.isFiltered()) {
+ Long last = buf.lastx();
+ Long first = buf.firstx();
+
+ if (last != null && first != null && last - first >= MAX_BUFF_SIZE) {
+ NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true);
+
+ GridLongList filteredEvts = new GridLongList((int)(last - first));
+
+ int size = 0;
+
+ Long cntr;
+
+ while ((cntr = prevHoles.pollFirst()) != null) {
+ filteredEvts.add(cntr);
+
+ ++size;
+ }
+
+ filteredEvts.truncate(size, true);
+
+ e.filteredEvents(filteredEvts);
+
+ return e;
+ }
+
+ if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1)
+ return e;
+ else {
+ buf.add(e.updateCounter());
+
+ // Double check. If another thread sent a event with counter higher than this event.
+ if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
+ buf.remove(e.updateCounter());
+
+ return e;
+ }
+ else
+ return null;
+ }
+ }
+ else {
+ long prevVal = updateFiredCounter(e.updateCounter());
+
+ if (prevVal == -1)
+ return e;
+ else {
+ NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true);
+
+ GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal));
+
+ int size = 0;
+
+ Long cntr;
+
+ while ((cntr = prevHoles.pollFirst()) != null) {
+ filteredEvts.add(cntr);
+
+ ++size;
+ }
+
+ filteredEvts.truncate(size, true);
+
+ e.filteredEvents(filteredEvts);
+
+ return e;
+ }
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
@@ -397,6 +974,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(final UUID routineId,
+ GridContinuousBatch batch,
+ final GridKernalContext ctx) {
+ sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
+ }
+
+ /**
+ * @param t Acknowledge information.
+ * @param routineId Routine ID.
+ * @param ctx Context.
+ */
+ private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t,
+ final UUID routineId,
+ final GridKernalContext ctx) {
+ if (t != null) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(),
+ routineId,
+ t.get1());
+
+ Collection<ClusterNode> nodes = new HashSet<>();
+
+ for (AffinityTopologyVersion topVer : t.get2())
+ nodes.addAll(ctx.discovery().cacheNodes(topVer));
+
+ for (ClusterNode node : nodes) {
+ if (!node.id().equals(ctx.localNodeId())) {
+ try {
+ cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send acknowledge message, node left " +
+ "[msg=" + msg + ", node=" + node + ']');
+ }
+ catch (IgniteCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
+
+ U.error(log, "Failed to send acknowledge message " +
+ "[msg=" + msg + ", node=" + node + ']', e);
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return topic;
}
@@ -471,6 +1107,93 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return ctx.cache().<K, V>context().cacheContext(cacheId);
}
+ /** */
+ private static class AcknowledgeBuffer {
+ /** */
+ private int size;
+
+ /** */
+ @GridToStringInclude
+ private Map<Integer, Long> updateCntrs = new HashMap<>();
+
+ /** */
+ @GridToStringInclude
+ private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+ /**
+ * @param batch Batch.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(GridContinuousBatch batch) {
+ size += batch.size();
+
+ Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+ for (CacheContinuousQueryEntry e : entries)
+ addEntry(e);
+
+ return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(CacheContinuousQueryEntry e) {
+ size++;
+
+ addEntry(e);
+
+ return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ */
+ private void addEntry(CacheContinuousQueryEntry e) {
+ topVers.add(e.topologyVersion());
+
+ Long cntr0 = updateCntrs.get(e.partition());
+
+ if (cntr0 == null || e.updateCounter() > cntr0)
+ updateCntrs.put(e.partition(), e.updateCounter());
+ }
+
+ /**
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ acknowledgeOnTimeout() {
+ return size > 0 ? acknowledgeData() : null;
+ }
+
+ /**
+ * @return Tuple with acknowledge information.
+ */
+ private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+ assert size > 0;
+
+ Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
+
+ IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+ new IgniteBiTuple<>(cntrs, topVers);
+
+ topVers = U.newHashSet(1);
+
+ size = 0;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AcknowledgeBuffer.class, this);
+ }
+ }
+
/**
* Deployable object.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a3c19a9..8342acf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import java.util.Map;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
/**
* Continuous query listener.
*/
@@ -41,6 +45,37 @@ interface CacheContinuousQueryListener<K, V> {
public void onUnregister();
/**
+ * Cleans backup queue.
+ *
+ * @param updateCntrs Update indexes map.
+ */
+ public void cleanupBackupQueue(Map<Integer, Long> updateCntrs);
+
+ /**
+ * Flushes backup queue.
+ *
+ * @param ctx Context.
+ * @param topVer Topology version.
+ */
+ public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer);
+
+ /**
+ * @param ctx Context.
+ */
+ public void acknowledgeBackupOnTimeout(GridKernalContext ctx);
+
+ /**
+ * @param evt Event
+ * @param topVer Topology version.
+ */
+ public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+
+ /**
+ * @param part Partition.
+ */
+ public void onPartitionEvicted(int part);
+
+ /**
* @return Whether old value is required.
*/
public boolean oldValueRequired();