You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 14:32:27 UTC
[18/22] ignite git commit: Continuous queries fixes: - flush backup
queue on exchange end (otherwise we don't really wait for all current
operations) - on coordinator apply counters after all single messages
received (otherwise extra counter increments a
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e7706dd..e5347c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -24,17 +24,12 @@ import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.Set;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
@@ -42,23 +37,22 @@ import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
@@ -67,9 +61,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
-import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -81,7 +72,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -94,7 +84,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private static final long serialVersionUID = 0L;
/** */
- private static final int BACKUP_ACK_THRESHOLD = 100;
+ static final int BACKUP_ACK_THRESHOLD =
+ IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100);
+
+ /** */
+ static final int LSNR_MAX_BUF_SIZE =
+ IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
/** Cache name. */
private String cacheName;
@@ -109,7 +104,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
/** Deployable object for filter. */
- private DeployableObject rmtFilterDep;
+ private CacheContinuousQueryDeployableObject rmtFilterDep;
/** Internal flag. */
private boolean internal;
@@ -132,9 +127,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
- /** Backup queue. */
- private transient volatile Collection<CacheContinuousQueryEntry> backupQueue;
-
/** */
private boolean locCache;
@@ -142,13 +134,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private boolean keepBinary;
/** */
- private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+ private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs;
/** */
- private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
+ private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs;
/** */
- private transient AcknowledgeBuffer ackBuf;
+ private transient CacheContinuousQueryAcknowledgeBuffer ackBuf;
/** */
private transient int cacheId;
@@ -163,6 +155,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private transient volatile AffinityTopologyVersion initTopVer;
/** */
+ private transient volatile boolean nodeLeft;
+
+ /** */
private transient boolean ignoreClsNotFound;
/** */
@@ -337,9 +332,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
entryBufs = new ConcurrentHashMap<>();
- backupQueue = new ConcurrentLinkedDeque8<>();
-
- ackBuf = new AcknowledgeBuffer();
+ ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
rcvs = new ConcurrentHashMap<>();
@@ -409,7 +402,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
}
else {
- final boolean notify = filter(evt, primary);
+ final boolean notify = filter(evt);
if (log.isDebugEnabled())
log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary
@@ -429,6 +422,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}, sync);
}
}
+ else
+ handleBackupEntry(cctx, evt.entry());
}
}
@@ -438,50 +433,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
- Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
- if (backupQueue0 != null) {
- Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
-
- while (it.hasNext()) {
- CacheContinuousQueryEntry backupEntry = it.next();
+ for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) {
+ CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey());
- Long updateCntr = updateCntrs.get(backupEntry.partition());
-
- if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
- it.remove();
- }
+ if (buf != null)
+ buf.cleanupBackupQueue(e.getValue());
}
}
@Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
assert topVer != null;
- Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+ try {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
- if (backupQueue0 == null)
- return;
+ ClusterNode node = ctx.discovery().node(nodeId);
- try {
- ClusterNode nodeId0 = ctx.discovery().node(nodeId);
+ for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+ CacheContinuousQueryEventBuffer buf = bufE.getValue();
- if (nodeId0 != null) {
- GridCacheContext<K, V> cctx = cacheContext(ctx);
+ Collection<CacheContinuousQueryEntry> backupQueue = buf.flushOnExchange();
- for (CacheContinuousQueryEntry e : backupQueue0) {
- if (!e.isFiltered())
- prepareEntry(cctx, nodeId, e);
+ if (backupQueue != null && node != null) {
+ for (CacheContinuousQueryEntry e : backupQueue) {
+ e.markBackup();
- e.topologyVersion(topVer);
- }
+ if (!e.isFiltered())
+ prepareEntry(cctx, nodeId, e);
+ }
- ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic);
+ ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+ }
}
- else
- // Node which start CQ leave topology. Not needed to put data to backup queue.
- backupQueue = null;
-
- backupQueue0.clear();
}
catch (IgniteCheckedException e) {
U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
@@ -505,14 +488,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onPartitionEvicted(int part) {
- Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
- if (backupQueue0 != null) {
- for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) {
- if (it.next().partition() == part)
- it.remove();
- }
- }
+ entryBufs.remove(part);
}
@Override public boolean oldValueRequired() {
@@ -739,17 +715,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
}
- PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
+ CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
return rec.collectEntries(e, cctx, cache);
}
/**
- * @param primary Primary.
* @param evt Query event.
* @return {@code True} if event passed filter otherwise {@code true}.
*/
- public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+ public boolean filter(CacheContinuousQueryEvent evt) {
CacheContinuousQueryEntry entry = evt.entry();
boolean notify = !entry.isFiltered();
@@ -765,15 +740,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!notify)
entry.markFiltered();
- if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
- entry.markBackup();
-
- Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
- if (backupQueue0 != null)
- backupQueue0.add(entry.forBackupQueue());
- }
-
return notify;
}
@@ -811,13 +777,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!entry.isFiltered())
prepareEntry(cctx, nodeId, entry);
- CacheContinuousQueryEntry e = handleEntry(entry);
+ Object entryOrList = handleEntry(cctx, entry);
- if (e != null) {
+ if (entryOrList != null) {
if (log.isDebugEnabled())
- log.debug("Send the following event to listener: " + e);
+ log.debug("Send the following event to listener: " + entryOrList);
- ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+ ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true);
}
}
}
@@ -865,7 +831,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (internal)
return;
- for (PartitionRecovery rec : rcvs.values())
+ for (CacheContinuousQueryPartitionRecovery rec : rcvs.values())
rec.resetTopologyCache();
}
@@ -875,12 +841,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param topVer Topology version for current operation.
* @return Partition recovery.
*/
- @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+ @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
int partId,
AffinityTopologyVersion topVer) {
assert topVer != null && topVer.topologyVersion() > 0 : topVer;
- PartitionRecovery rec = rcvs.get(partId);
+ CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
T2<Long, Long> partCntrs = null;
@@ -905,10 +871,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
else if (initUpdCntrs != null)
partCntrs = initUpdCntrs.get(partId);
- rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
+ rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);
- PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
+ CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
if (oldRec != null)
rec = oldRec;
@@ -918,10 +884,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ * @param cctx Cache context.
+ * @param e Entry.
+ */
+ private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
+ if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries.
+ return;
+
+ CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
+
+ buf.processEntry(e.copyWithDataReset(), true);
+ }
+
+ /**
+ * @param cctx Cache context.
* @param e Entry.
* @return Entry.
*/
- private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+ private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
assert e != null;
assert entryBufs != null;
@@ -934,354 +914,52 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
// Initial query entry.
// This events should be fired immediately.
- if (e.updateCounter() == -1)
+ if (e.updateCounter() == -1L)
return e;
- EntryBuffer buf = entryBufs.get(e.partition());
+ CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
- if (buf == null) {
- buf = new EntryBuffer();
-
- EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
-
- if (oldRec != null)
- buf = oldRec;
- }
-
- return buf.handle(e);
+ return buf.processEntry(e, false);
}
/**
- *
+ * @param cctx Cache context.
+ * @param part Partition.
+ * @return Event buffer.
*/
- private static class PartitionRecovery {
- /** Event which means hole in sequence. */
- private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
-
- /** */
- private final static int MAX_BUFF_SIZE = 100;
-
- /** */
- private IgniteLogger log;
-
- /** */
- private long lastFiredEvt;
-
- /** */
- private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
-
- /** */
- private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
-
- /**
- * @param log Logger.
- * @param topVer Topology version.
- * @param initCntr Update counters.
- */
- PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
- this.log = log;
-
- if (initCntr != null) {
- assert topVer.topologyVersion() > 0 : topVer;
-
- this.lastFiredEvt = initCntr;
-
- curTop = topVer;
- }
- }
-
- /**
- * Resets cached topology.
- */
- void resetTopologyCache() {
- curTop = AffinityTopologyVersion.NONE;
- }
-
- /**
- * Add continuous entry.
- *
- * @param cctx Cache context.
- * @param cache Cache.
- * @param entry Cache continuous query entry.
- * @return Collection entries which will be fired. This collection should contains only non-filtered events.
- */
- <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
- CacheContinuousQueryEntry entry,
- GridCacheContext cctx,
- IgniteCache cache
- ) {
- assert entry != null;
-
- if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
- assert entry.updateCounter() == 0L : entry;
-
- return F.<CacheEntryEvent<? extends K, ? extends V>>
- asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
- }
-
- List<CacheEntryEvent<? extends K, ? extends V>> entries;
-
- synchronized (pendingEvts) {
- if (log.isDebugEnabled()) {
- log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
-
- // Received first event.
- if (curTop == AffinityTopologyVersion.NONE) {
- lastFiredEvt = entry.updateCounter();
-
- curTop = entry.topologyVersion();
-
- if (log.isDebugEnabled()) {
- log.debug("First event [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() + ']');
- }
-
- return !entry.isFiltered() ?
- F.<CacheEntryEvent<? extends K, ? extends V>>
- asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
- Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
- }
-
- if (curTop.compareTo(entry.topologyVersion()) < 0) {
- if (entry.updateCounter() == 1L && !entry.isBackup()) {
- entries = new ArrayList<>(pendingEvts.size());
-
- for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
- if (evt != HOLE && !evt.isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
- }
-
- pendingEvts.clear();
-
- curTop = entry.topologyVersion();
-
- lastFiredEvt = entry.updateCounter();
-
- if (!entry.isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+ private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
+ CacheContinuousQueryEventBuffer buf = entryBufs.get(part);
- if (log.isDebugEnabled())
- log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
-
- return entries;
- }
-
- curTop = entry.topologyVersion();
- }
-
- // Check duplicate.
- if (entry.updateCounter() > lastFiredEvt) {
- pendingEvts.put(entry.updateCounter(), entry);
-
- // Put filtered events.
- if (entry.filteredEvents() != null) {
- for (long cnrt : entry.filteredEvents()) {
- if (cnrt > lastFiredEvt)
- pendingEvts.put(cnrt, HOLE);
- }
- }
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Skip duplicate continuous query message: " + entry);
-
- return Collections.emptyList();
- }
-
- if (pendingEvts.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() + ']');
- }
-
- return Collections.emptyList();
- }
-
- Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
-
- entries = new ArrayList<>();
-
- if (pendingEvts.size() >= MAX_BUFF_SIZE) {
- for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
- Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
- if (e.getValue() != HOLE && !e.getValue().isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
- lastFiredEvt = e.getKey();
-
- iter.remove();
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
- }
- else {
- // Elements are consistently.
- while (iter.hasNext()) {
- Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
- if (e.getKey() == lastFiredEvt + 1) {
- ++lastFiredEvt;
+ if (buf == null) {
+ buf = new CacheContinuousQueryEventBuffer(part) {
+ @Override protected long currentPartitionCounter() {
+ GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
- if (e.getValue() != HOLE && !e.getValue().isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+ if (locPart == null)
+ return -1L;
- iter.remove();
- }
- else
- break;
- }
+ return locPart.updateCounter();
}
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Will send to listener the following events [entries=" + entries +
- ", lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
-
- return entries;
- }
- }
-
- /**
- *
- */
- private static class EntryBuffer {
- /** */
- private final static int MAX_BUFF_SIZE = 100;
+ };
- /** */
- private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>();
-
- /** */
- private AtomicLong lastFiredCntr = new AtomicLong();
-
- /**
- * @param newVal New value.
- * @return Old value if previous value less than new value otherwise {@code -1}.
- */
- private long updateFiredCounter(long newVal) {
- long prevVal = lastFiredCntr.get();
-
- while (prevVal < newVal) {
- if (lastFiredCntr.compareAndSet(prevVal, newVal))
- return prevVal;
- else
- prevVal = lastFiredCntr.get();
- }
+ CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(part, buf);
- return prevVal >= newVal ? -1 : prevVal;
+ if (oldBuf != null)
+ buf = oldBuf;
}
- /**
- * Add continuous entry.
- *
- * @param e Cache continuous query entry.
- * @return Collection entries which will be fired.
- */
- public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
- assert e != null;
-
- if (e.isFiltered()) {
- Long last = buf.lastx();
- Long first = buf.firstx();
-
- if (last != null && first != null && last - first >= MAX_BUFF_SIZE) {
- NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true);
-
- GridLongList filteredEvts = new GridLongList((int)(last - first));
-
- int size = 0;
-
- Long cntr;
-
- while ((cntr = prevHoles.pollFirst()) != null) {
- filteredEvts.add(cntr);
-
- ++size;
- }
-
- filteredEvts.truncate(size, true);
-
- e.filteredEvents(filteredEvts);
-
- return e;
- }
-
- if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1)
- return e;
- else {
- buf.add(e.updateCounter());
-
- // Double check. If another thread sent a event with counter higher than this event.
- if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
- buf.remove(e.updateCounter());
-
- return e;
- }
- else
- return null;
- }
- }
- else {
- long prevVal = updateFiredCounter(e.updateCounter());
-
- if (prevVal == -1)
- return e;
- else {
- NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true);
-
- GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal));
-
- int size = 0;
-
- Long cntr;
-
- while ((cntr = prevHoles.pollFirst()) != null) {
- filteredEvts.add(cntr);
-
- ++size;
- }
-
- filteredEvts.truncate(size, true);
-
- e.filteredEvents(filteredEvts);
-
- return e;
- }
- }
- }
+ return buf;
}
/** {@inheritDoc} */
@Override public void onNodeLeft() {
- Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+ nodeLeft = true;
- if (backupQueue0 != null)
- backupQueue = null;
+ for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+ CacheContinuousQueryEventBuffer buf = bufE.getValue();
+
+ buf.flushOnExchange();
+ }
}
/** {@inheritDoc} */
@@ -1290,7 +968,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
assert ctx.config().isPeerClassLoadingEnabled();
if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
- rmtFilterDep = new DeployableObject(rmtFilter, ctx);
+ rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx);
}
/** {@inheritDoc} */
@@ -1411,7 +1089,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean b = in.readBoolean();
if (b)
- rmtFilterDep = (DeployableObject)in.readObject();
+ rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
else
rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
@@ -1436,95 +1114,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
return ctx.cache().<K, V>context().cacheContext(cacheId);
}
- /** */
- private static class AcknowledgeBuffer {
- /** */
- private int size;
-
- /** */
- @GridToStringInclude
- private Map<Integer, Long> updateCntrs = new HashMap<>();
-
- /** */
- @GridToStringInclude
- private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
-
- /**
- * @param batch Batch.
- * @return Non-null tuple if acknowledge should be sent to backups.
- */
- @SuppressWarnings("unchecked")
- @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
- onAcknowledged(GridContinuousBatch batch) {
- assert batch instanceof GridContinuousQueryBatch;
-
- size += ((GridContinuousQueryBatch)batch).entriesCount();
-
- Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
-
- for (CacheContinuousQueryEntry e : entries)
- addEntry(e);
-
- return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
- }
-
- /**
- * @param e Entry.
- * @return Non-null tuple if acknowledge should be sent to backups.
- */
- @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
- onAcknowledged(CacheContinuousQueryEntry e) {
- size++;
-
- addEntry(e);
-
- return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
- }
-
- /**
- * @param e Entry.
- */
- private void addEntry(CacheContinuousQueryEntry e) {
- topVers.add(e.topologyVersion());
-
- Long cntr0 = updateCntrs.get(e.partition());
-
- if (cntr0 == null || e.updateCounter() > cntr0)
- updateCntrs.put(e.partition(), e.updateCounter());
- }
-
- /**
- * @return Non-null tuple if acknowledge should be sent to backups.
- */
- @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
- acknowledgeOnTimeout() {
- return size > 0 ? acknowledgeData() : null;
- }
-
- /**
- * @return Tuple with acknowledge information.
- */
- private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
- assert size > 0;
-
- Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
-
- IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
- new IgniteBiTuple<>(cntrs, topVers);
-
- topVers = U.newHashSet(1);
-
- size = 0;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(AcknowledgeBuffer.class, this);
- }
- }
-
/**
*
*/
@@ -1560,44 +1149,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** {@inheritDoc} */
@Override public void run() {
- final boolean notify = filter(evt, primary);
-
- if (!primary())
- return;
+ final boolean notify = filter(evt);
- if (fut == null) {
- onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+ if (primary || skipPrimaryCheck) {
+ if (fut == null) {
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
- return;
- }
+ return;
+ }
- if (fut.isDone()) {
- if (fut.error() != null)
- evt.entry().markFiltered();
+ if (fut.isDone()) {
+ if (fut.error() != null)
+ evt.entry().markFiltered();
- onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
- }
- else {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- if (f.error() != null)
- evt.entry().markFiltered();
-
- ctx.asyncCallbackPool().execute(new Runnable() {
- @Override public void run() {
- onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
- }
- }, evt.entry().partition());
- }
- });
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ if (f.error() != null)
+ evt.entry().markFiltered();
+
+ ctx.asyncCallbackPool().execute(new Runnable() {
+ @Override public void run() {
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+ }
+ }, evt.entry().partition());
+ }
+ });
+ }
}
- }
-
- /**
- * @return {@code True} if event fired on this node.
- */
- private boolean primary() {
- return primary || skipPrimaryCheck;
+ else
+ handleBackupEntry(cacheContext(ctx), evt.entry());
}
/** {@inheritDoc} */
@@ -1606,82 +1189,4 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
}
- /**
- * Deployable object.
- */
- protected static class DeployableObject implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Serialized object. */
- private byte[] bytes;
-
- /** Deployment class name. */
- private String clsName;
-
- /** Deployment info. */
- private GridDeploymentInfo depInfo;
-
- /**
- * Required by {@link Externalizable}.
- */
- public DeployableObject() {
- // No-op.
- }
-
- /**
- * @param obj Object.
- * @param ctx Kernal context.
- * @throws IgniteCheckedException In case of error.
- */
- protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
- assert obj != null;
- assert ctx != null;
-
- Class cls = U.detectClass(obj);
-
- clsName = cls.getName();
-
- GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
-
- depInfo = new GridDeploymentInfoBean(dep);
-
- bytes = U.marshal(ctx, obj);
- }
-
- /**
- * @param nodeId Node ID.
- * @param ctx Kernal context.
- * @return Deserialized object.
- * @throws IgniteCheckedException In case of error.
- */
- <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
- assert ctx != null;
-
- GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
- depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
-
- return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeByteArray(out, bytes);
- U.writeString(out, clsName);
- out.writeObject(depInfo);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- bytes = U.readByteArray(in);
- clsName = U.readString(in);
- depInfo = (GridDeploymentInfo)in.readObject();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 7aef4dd..e48d22e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -44,7 +44,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
/** Deployable object for filter factory. */
- private DeployableObject rmtFilterFactoryDep;
+ private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
/** Event types for JCache API. */
private byte types;
@@ -122,7 +122,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
super.p2pMarshal(ctx);
if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
- rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+ rmtFilterFactoryDep = new CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
}
/** {@inheritDoc} */
@@ -167,7 +167,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
boolean b = in.readBoolean();
if (b)
- rmtFilterFactoryDep = (DeployableObject)in.readObject();
+ rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
else
rmtFilterFactory = (Factory)in.readObject();
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index acf351f..1a655e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -193,7 +193,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
partId,
updCntr,
- topVer);
+ topVer,
+ (byte)0);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -339,7 +340,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
partId,
updateCntr,
- topVer);
+ topVer,
+ (byte)0);
IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
@@ -400,7 +402,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.keepBinary(),
e.partition(),
-1,
- null);
+ null,
+ (byte)0);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -568,9 +571,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * @param topVer Topology version.
+ * @param topVer Finished exchange topology version.
*/
- public void beforeExchange(AffinityTopologyVersion topVer) {
+ public void flushBackupQueue(AffinityTopologyVersion topVer) {
for (CacheContinuousQueryListener lsnr : lsnrs.values())
lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
}
@@ -703,7 +706,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
keepBinary,
0,
-1,
- null);
+ null,
+ (byte)0);
next = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
new file mode 100644
index 0000000..e210c24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class CacheContinuousQueryPartitionRecovery {
+ /** Event which means hole in sequence. */
+ private static final CacheContinuousQueryEntry HOLE;
+
+ static {
+ HOLE = new CacheContinuousQueryEntry();
+
+ HOLE.markFiltered();
+ }
+
+ /** */
+ private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+
+ /** */
+ private IgniteLogger log;
+
+ /** */
+ private long lastFiredEvt;
+
+ /** */
+ private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
+
+ /** */
+ private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+
+ /**
+ * @param log Logger.
+ * @param topVer Topology version.
+ * @param initCntr Update counters.
+ */
+ CacheContinuousQueryPartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+ this.log = log;
+
+ if (initCntr != null) {
+ assert topVer.topologyVersion() > 0 : topVer;
+
+ this.lastFiredEvt = initCntr;
+
+ curTop = topVer;
+ }
+ }
+
+ /**
+ * Resets cached topology.
+ */
+ void resetTopologyCache() {
+ curTop = AffinityTopologyVersion.NONE;
+ }
+
+ /**
+ * Add continuous entry.
+ *
+ * @param cctx Cache context.
+ * @param cache Cache.
+ * @param entry Cache continuous query entry.
+ * @return Collection entries which will be fired. This collection should contains only non-filtered events.
+ */
+ <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+ CacheContinuousQueryEntry entry,
+ GridCacheContext cctx,
+ IgniteCache cache
+ ) {
+ assert entry != null;
+
+ if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+ assert entry.updateCounter() == 0L : entry;
+
+ return F.<CacheEntryEvent<? extends K, ? extends V>>
+ asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+ }
+
+ List<CacheEntryEvent<? extends K, ? extends V>> entries;
+
+ synchronized (pendingEvts) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() +
+ ", pendingEvts=" + pendingEvts + ']');
+ }
+
+ // Received first event.
+ if (curTop == AffinityTopologyVersion.NONE) {
+ lastFiredEvt = entry.updateCounter();
+
+ curTop = entry.topologyVersion();
+
+ if (log.isDebugEnabled()) {
+ log.debug("First event [lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() + ']');
+ }
+
+ return !entry.isFiltered() ?
+ F.<CacheEntryEvent<? extends K, ? extends V>>
+ asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
+ Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
+ }
+
+ if (curTop.compareTo(entry.topologyVersion()) < 0) {
+ if (entry.updateCounter() == 1L && !entry.isBackup()) {
+ entries = new ArrayList<>(pendingEvts.size());
+
+ for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+ if (evt != HOLE && !evt.isFiltered())
+ entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
+ }
+
+ pendingEvts.clear();
+
+ curTop = entry.topologyVersion();
+
+ lastFiredEvt = entry.updateCounter();
+
+ if (!entry.isFiltered())
+ entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+
+ if (log.isDebugEnabled())
+ log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() +
+ ", pendingEvts=" + pendingEvts + ']');
+
+ return entries;
+ }
+
+ curTop = entry.topologyVersion();
+ }
+
+ // Check duplicate.
+ if (entry.updateCounter() > lastFiredEvt)
+ pendingEvts.put(entry.updateCounter(), entry);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skip duplicate continuous query message: " + entry);
+
+ return Collections.emptyList();
+ }
+
+ if (pendingEvts.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() + ']');
+ }
+
+ return Collections.emptyList();
+ }
+
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
+
+ entries = new ArrayList<>();
+
+ if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+ if (log.isDebugEnabled()) {
+ log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() +
+ ", pendingEvts=" + pendingEvts + ']');
+ }
+
+ LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
+ ", bufSize=" + MAX_BUFF_SIZE +
+ ", partId=" + entry.partition() + ']');
+
+ for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ if (e.getValue() != HOLE && !e.getValue().isFiltered())
+ entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+
+ lastFiredEvt = e.getKey();
+
+ iter.remove();
+ }
+ }
+ else {
+ boolean skippedFiltered = false;
+
+ while (iter.hasNext()) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ CacheContinuousQueryEntry pending = e.getValue();
+
+ long filtered = pending.filteredCount();
+
+ boolean fire = e.getKey() == lastFiredEvt + 1;;
+
+ if (!fire && filtered > 0)
+ fire = e.getKey() - filtered <= lastFiredEvt + 1;
+
+ if (fire) {
+ lastFiredEvt = e.getKey();
+
+ if (e.getValue() != HOLE && !e.getValue().isFiltered())
+ entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, pending));
+
+ iter.remove();
+ }
+ else {
+ if (pending.isFiltered())
+ skippedFiltered = true;
+ else
+ break;
+ }
+ }
+
+ if (skippedFiltered)
+ pendingEvts.headMap(lastFiredEvt).clear();
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Will send to listener the following events [entries=" + entries +
+ ", lastFiredEvt=" + lastFiredEvt +
+ ", curTop=" + curTop +
+ ", entUpdCnt=" + entry.updateCounter() +
+ ", partId=" + entry.partition() +
+ ", pendingEvts=" + pendingEvts + ']');
+ }
+
+ return entries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
index 4540de1..597eae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -25,7 +25,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
*/
public class GridContinuousBatchAdapter implements GridContinuousBatch {
/** Buffer. */
- private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+ protected final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
/** {@inheritDoc} */
@Override public void add(Object obj) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index abcd8ea..a72dcd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -75,7 +75,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -872,10 +871,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null);
}
else {
- LocalRoutineInfo localRoutineInfo = locInfos.get(routineId);
+ LocalRoutineInfo locRoutineInfo = locInfos.get(routineId);
- if (localRoutineInfo != null)
- localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
+ if (locRoutineInfo != null)
+ locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
}
}
@@ -897,7 +896,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
- assert !msg || obj instanceof Message : obj;
+ assert !msg || (obj instanceof Message || obj instanceof Collection) : obj;
assert !nodeId.equals(ctx.localNodeId());
@@ -917,7 +916,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
syncMsgFuts.put(futId, fut);
try {
- sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
+ sendNotification(nodeId,
+ routineId,
+ futId,
+ obj instanceof Collection ? (Collection)obj : F.asList(obj),
+ null,
+ msg,
+ null);
info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx);
}
@@ -1563,7 +1568,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridContinuousBatch addAll(Collection<?> objs) {
assert objs != null;
- GridContinuousBatch toSnd = null;
+ GridContinuousBatch toSnd;
lock.writeLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
index c5d854b..0eba44b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.continuous;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
@@ -31,11 +32,20 @@ public class GridContinuousQueryBatch extends GridContinuousBatchAdapter {
/** {@inheritDoc} */
@Override public void add(Object obj) {
assert obj != null;
- assert obj instanceof CacheContinuousQueryEntry;
+ assert obj instanceof CacheContinuousQueryEntry || obj instanceof List;
- super.add(obj);
+ if (obj instanceof CacheContinuousQueryEntry) {
+ buf.add(obj);
- size.addAndGet(((CacheContinuousQueryEntry)obj).size());
+ size.incrementAndGet();
+ }
+ else {
+ List<Object> objs = (List<Object>)obj;
+
+ buf.addAll(objs);
+
+ size.addAndGet(objs.size());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
index 3cab9e0..d505d19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.CacheMode;
*/
public class CacheContinuousQueryAsyncFailoverAtomicSelfTest
extends CacheContinuousQueryFailoverAbstractSelfTest {
-
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return CacheMode.PARTITIONED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
new file mode 100644
index 0000000..9c7c836
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentUpdatePartitionAtomic() throws Exception {
+ concurrentUpdatePartition(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentUpdatePartitionTx() throws Exception {
+ concurrentUpdatePartition(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception {
+ Ignite srv = startGrid(0);
+
+ client = true;
+
+ Ignite client = startGrid(1);
+
+ CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+
+ IgniteCache clientCache = client.createCache(ccfg);
+
+ final AtomicInteger evtCnt = new AtomicInteger();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent evt : evts) {
+ assertNotNull(evt.getKey());
+ assertNotNull(evt.getValue());
+
+ evtCnt.incrementAndGet();
+ }
+ }
+ });
+
+ clientCache.query(qry);
+
+ Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+ final List<Integer> keys = new ArrayList<>();
+
+ final int KEYS = 10;
+
+ for (int i = 0; i < 100_000; i++) {
+ if (aff.partition(i) == 0) {
+ keys.add(i);
+
+ if (keys.size() == KEYS)
+ break;
+ }
+ }
+
+ assertEquals(KEYS, keys.size());
+
+ final int THREADS = 10;
+ final int UPDATES = 1000;
+
+ final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 15; i++) {
+ log.info("Iteration: " + i);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < UPDATES; i++)
+ srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+ return null;
+ }
+ }, THREADS, "update");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ log.info("Events: " + evtCnt.get());
+
+ return evtCnt.get() >= THREADS * UPDATES;
+ }
+ }, 5000);
+
+ assertEquals(THREADS * UPDATES, evtCnt.get());
+
+ evtCnt.set(0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception {
+ concurrentUpdatesAndQueryStart(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentUpdatesAndQueryStartTx() throws Exception {
+ concurrentUpdatesAndQueryStart(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception {
+ Ignite srv = startGrid(0);
+
+ client = true;
+
+ Ignite client = startGrid(1);
+
+ CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+
+ IgniteCache clientCache = client.createCache(ccfg);
+
+ Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+ final List<Integer> keys = new ArrayList<>();
+
+ final int KEYS = 10;
+
+ for (int i = 0; i < 100_000; i++) {
+ if (aff.partition(i) == 0) {
+ keys.add(i);
+
+ if (keys.size() == KEYS)
+ break;
+ }
+ }
+
+ assertEquals(KEYS, keys.size());
+
+ final int THREADS = 10;
+ final int UPDATES = 1000;
+
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final AtomicInteger evtCnt = new AtomicInteger();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent evt : evts) {
+ assertNotNull(evt.getKey());
+ assertNotNull(evt.getValue());
+
+ if ((Integer)evt.getValue() >= 0)
+ evtCnt.incrementAndGet();
+ }
+ }
+ });
+
+ QueryCursor cur;
+
+ final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get())
+ srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
+
+ return null;
+ }
+ }, THREADS, "update");
+
+ U.sleep(1000);
+
+ cur = clientCache.query(qry);
+
+ U.sleep(1000);
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+
+ GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < UPDATES; i++)
+ srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+ return null;
+ }
+ }, THREADS, "update");
+
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ log.info("Events: " + evtCnt.get());
+
+ return evtCnt.get() >= THREADS * UPDATES;
+ }
+ }, 5000);
+
+ assertEquals(THREADS * UPDATES, evtCnt.get());
+
+ cur.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
new file mode 100644
index 0000000..382f166
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
+import javax.cache.event.EventType;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest {
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBuffer1() throws Exception {
+ testBuffer(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBuffer2() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ log.info("Iteration: " + i);
+
+ testBuffer(10);
+ }
+ }
+
+ /**
+ * @param threads Threads number.
+ * @throws Exception If failed.
+ */
+ private void testBuffer(int threads) throws Exception {
+ long seed = System.nanoTime();
+
+ Random rnd = new Random(seed);
+
+ log.info("Start test, seed: " + seed);
+
+ for (int i = 0; i < 10; i++) {
+ int cnt = rnd.nextInt(10_000) + 1;
+
+ testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.5f, threads);
+ testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.9f, threads);
+ testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.99f, threads);
+ testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.01f, threads);
+ testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.f, threads);
+ }
+
+ CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(0);
+
+ long cntr = 1;
+
+ for (int i = 0; i < 10; i++) {
+ int cnt = rnd.nextInt(10_000) + 1;
+ float ratio = rnd.nextFloat();
+
+ testBuffer(rnd, b, cnt, cntr, ratio, threads);
+
+ cntr += cnt;
+ }
+ }
+
+ /**
+ * @param rnd Random.
+ * @param b Buffer.
+ * @param cnt Entries count.
+ * @param cntr Current counter.
+ * @param filterRatio Filtered events ratio.
+ * @param threads Threads number.
+ * @throws Exception If failed.
+ */
+ private void testBuffer(Random rnd,
+ final CacheContinuousQueryEventBuffer b,
+ int cnt,
+ long cntr,
+ float filterRatio,
+ int threads)
+ throws Exception
+ {
+ List<CacheContinuousQueryEntry> expEntries = new ArrayList<>();
+
+ List<CacheContinuousQueryEntry> entries = new ArrayList<>();
+
+ long filtered = b.currentFiltered();
+
+ for (int i = 0; i < cnt; i++) {
+ CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+ 0,
+ EventType.CREATED,
+ null,
+ null,
+ null,
+ false,
+ 0,
+ cntr,
+ null,
+ (byte)0);
+
+ entries.add(entry);
+
+ if (rnd.nextFloat() < filterRatio) {
+ entry.markFiltered();
+
+ filtered++;
+ }
+ else {
+ CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry(
+ 0,
+ EventType.CREATED,
+ null,
+ null,
+ null,
+ false,
+ 0,
+ cntr,
+ null,
+ (byte)0);
+
+ expEntry.filteredCount(filtered);
+
+ expEntries.add(expEntry);
+
+ filtered = 0;
+ }
+
+ cntr++;
+ }
+
+ Collections.shuffle(entries, rnd);
+
+ List<CacheContinuousQueryEntry> actualEntries = new ArrayList<>(expEntries.size());
+
+ if (threads == 1) {
+ for (int i = 0; i < entries.size(); i++) {
+ Object o = entries.get(i);
+
+ Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
+
+ if (res != null) {
+ if (res instanceof CacheContinuousQueryEntry)
+ actualEntries.add((CacheContinuousQueryEntry)res);
+ else
+ actualEntries.addAll((List<CacheContinuousQueryEntry>)res);
+ }
+ }
+ }
+ else {
+ final CyclicBarrier barrier = new CyclicBarrier(threads);
+
+ final ConcurrentLinkedQueue<CacheContinuousQueryEntry> q = new ConcurrentLinkedQueue<>(entries);
+
+ final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ barrier.await();
+
+ Object o;
+
+ while ((o = q.poll()) != null) {
+ Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
+
+ if (res != null) {
+ if (res instanceof CacheContinuousQueryEntry)
+ act0.put(((CacheContinuousQueryEntry)res).updateCounter(), (CacheContinuousQueryEntry)res);
+ else {
+ for (CacheContinuousQueryEntry e : ((List<CacheContinuousQueryEntry>)res))
+ act0.put(e.updateCounter(), e);
+ }
+ }
+ }
+
+ return null;
+ }
+ }, threads, "test");
+
+ actualEntries.addAll(act0.values());
+ }
+
+ assertEquals(expEntries.size(), actualEntries.size());
+
+ for (int i = 0; i < expEntries.size(); i++) {
+ CacheContinuousQueryEntry expEvt = expEntries.get(i);
+ CacheContinuousQueryEntry actualEvt = actualEntries.get(i);
+
+ assertEquals(expEvt.updateCounter(), actualEvt.updateCounter());
+ assertEquals(expEvt.filteredCount(), actualEvt.filteredCount());
+ }
+ }
+}