You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:13 UTC
[10/18] ignite git commit: Merge branch 'ignite-5075-cc' into
ignite-5075-cc-debug
Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/529dec10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/529dec10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/529dec10
Branch: refs/heads/ignite-5075-cc-debug
Commit: 529dec1018290fb9d05890150f7dd7ca410902e5
Parents: 8ee752d
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 13:21:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 13:21:12 2017 +0300
----------------------------------------------------------------------
.../CacheContinuousQueryEventBuffer.java | 11 -
.../continuous/CacheContinuousQueryHandler.java | 259 +------------------
.../CacheContinuousQueryPartitionRecovery.java | 48 +++-
3 files changed, 46 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 6295b0b..59b92eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -87,17 +87,6 @@ public class CacheContinuousQueryEventBuffer {
return null;
}
- /** */
- private final int part;
-
- public CacheContinuousQueryEventBuffer() {
- part = 0;
- }
-
- public CacheContinuousQueryEventBuffer(int part) {
- this.part = part;
- }
-
/**
* @return Initial partition counter.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2f88827..72fdd83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -947,264 +947,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
buf = oldBuf;
}
- return buf.processEntry(e);
- }
-
- /**
- *
- */
- private static class PartitionRecovery {
- /** Event which means hole in sequence. */
- private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
-
- /** */
- private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
-
- /** */
- private IgniteLogger log;
-
- /** */
- private long lastFiredEvt;
-
- /** */
- private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
-
- /** */
- private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
-
- /**
- * @param log Logger.
- * @param topVer Topology version.
- * @param initCntr Update counters.
- */
- PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
- this.log = log;
-
- if (initCntr != null) {
- assert topVer.topologyVersion() > 0 : topVer;
-
- this.lastFiredEvt = initCntr;
-
- curTop = topVer;
- }
- }
-
- /**
- * Resets cached topology.
- */
- void resetTopologyCache() {
- curTop = AffinityTopologyVersion.NONE;
- }
-
- /**
- * Add continuous entry.
- *
- * @param cctx Cache context.
- * @param cache Cache.
- * @param entry Cache continuous query entry.
- * @return Collection entries which will be fired. This collection should contains only non-filtered events.
- */
- <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
- CacheContinuousQueryEntry entry,
- GridCacheContext cctx,
- IgniteCache cache
- ) {
- assert entry != null;
-
- if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
- assert entry.updateCounter() == 0L : entry;
-
- return F.<CacheEntryEvent<? extends K, ? extends V>>
- asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
- }
-
- List<CacheEntryEvent<? extends K, ? extends V>> entries;
-
- synchronized (pendingEvts) {
- if (log.isDebugEnabled()) {
- log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
-
- // Received first event.
- if (curTop == AffinityTopologyVersion.NONE) {
- lastFiredEvt = entry.updateCounter();
-
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
-
- curTop = entry.topologyVersion();
-
- if (log.isDebugEnabled()) {
- log.debug("First event [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() + ']');
- }
-
- return !entry.isFiltered() ?
- F.<CacheEntryEvent<? extends K, ? extends V>>
- asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
- Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
- }
-
- if (curTop.compareTo(entry.topologyVersion()) < 0) {
- if (entry.updateCounter() == 1L && !entry.isBackup()) {
- entries = new ArrayList<>(pendingEvts.size());
-
- for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
- if (evt != HOLE && !evt.isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
- }
-
- pendingEvts.clear();
-
- curTop = entry.topologyVersion();
-
- lastFiredEvt = entry.updateCounter();
-
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
-
- if (!entry.isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-
- if (log.isDebugEnabled())
- log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
-
- return entries;
- }
-
- curTop = entry.topologyVersion();
- }
-
- // Check duplicate.
- if (entry.updateCounter() > lastFiredEvt) {
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "add event last=" + lastFiredEvt +
- " cntr=" + entry.updateCounter() +
- " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) +
- " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) +
- " topVer=" + entry.topologyVersion());
-
- pendingEvts.put(entry.updateCounter(), entry);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Skip duplicate continuous query message: " + entry);
-
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "skip duplicate last=" + lastFiredEvt +
- " cntr=" + entry.updateCounter() +
- " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) +
- " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) +
- " topVer=" + entry.topologyVersion());
-
- return Collections.emptyList();
- }
-
- if (pendingEvts.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() + ']');
- }
-
- return Collections.emptyList();
- }
-
- Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
-
- entries = new ArrayList<>();
-
- if (pendingEvts.size() >= MAX_BUFF_SIZE) {
- if (log.isDebugEnabled()) {
- log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
-
- LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
- ", bufSize=" + MAX_BUFF_SIZE +
- ", partId=" + entry.partition() + ']');
-
- for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
- Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
- if (e.getValue() != HOLE && !e.getValue().isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
- lastFiredEvt = e.getKey();
-
- iter.remove();
- }
- }
- else {
- while (iter.hasNext()) {
- Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
- CacheContinuousQueryEntry pending = e.getValue();
-
- long filtered = pending.filteredCount();
-
- boolean fire = e.getKey() == lastFiredEvt + 1;;
-
- if (!fire && filtered > 0)
- fire = e.getKey() - filtered <= lastFiredEvt + 1;
-
- if (fire) {
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "process last=" + lastFiredEvt +
- " cntr=" + e.getKey() +
- " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false)) +
- " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false)) +
- " topVer=" + e.getValue().topologyVersion() +
- " f=" + pending.filteredCount());
-
- lastFiredEvt = e.getKey();
-
- if (e.getValue() != HOLE && !e.getValue().isFiltered())
- entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
- iter.remove();
- }
- else {
- TestDebugLog.addEntryMessage(entry.partition(),
- entry.updateCounter(),
- "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
-
- break;
- }
- }
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Will send to listener the following events [entries=" + entries +
- ", lastFiredEvt=" + lastFiredEvt +
- ", curTop=" + curTop +
- ", entUpdCnt=" + entry.updateCounter() +
- ", partId=" + entry.partition() +
- ", pendingEvts=" + pendingEvts + ']');
- }
-
- return entries;
- }
+ return buf;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 534ce9c..12eaa20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -31,8 +31,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+
/**
*
*/
@@ -41,7 +44,7 @@ class CacheContinuousQueryPartitionRecovery {
private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
/** */
- private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+ private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
/** */
private IgniteLogger log;
@@ -116,6 +119,10 @@ class CacheContinuousQueryPartitionRecovery {
if (curTop == AffinityTopologyVersion.NONE) {
lastFiredEvt = entry.updateCounter();
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
+
curTop = entry.topologyVersion();
if (log.isDebugEnabled()) {
@@ -146,6 +153,10 @@ class CacheContinuousQueryPartitionRecovery {
lastFiredEvt = entry.updateCounter();
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
+
if (!entry.isFiltered())
entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
@@ -163,12 +174,29 @@ class CacheContinuousQueryPartitionRecovery {
}
// Check duplicate.
- if (entry.updateCounter() > lastFiredEvt)
+ if (entry.updateCounter() > lastFiredEvt) {
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "add event last=" + lastFiredEvt +
+ " cntr=" + entry.updateCounter() +
+ " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) +
+ " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) +
+ " topVer=" + entry.topologyVersion());
+
pendingEvts.put(entry.updateCounter(), entry);
+ }
else {
if (log.isDebugEnabled())
log.debug("Skip duplicate continuous query message: " + entry);
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "skip duplicate last=" + lastFiredEvt +
+ " cntr=" + entry.updateCounter() +
+ " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) +
+ " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) +
+ " topVer=" + entry.topologyVersion());
+
return Collections.emptyList();
}
@@ -225,6 +253,15 @@ class CacheContinuousQueryPartitionRecovery {
fire = e.getKey() - filtered <= lastFiredEvt + 1;
if (fire) {
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "process last=" + lastFiredEvt +
+ " cntr=" + e.getKey() +
+ " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false)) +
+ " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false)) +
+ " topVer=" + e.getValue().topologyVersion() +
+ " f=" + pending.filteredCount());
+
lastFiredEvt = e.getKey();
if (e.getValue() != HOLE && !e.getValue().isFiltered())
@@ -232,8 +269,13 @@ class CacheContinuousQueryPartitionRecovery {
iter.remove();
}
- else
+ else {
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
+
break;
+ }
}
}
}