You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:13 UTC

[10/18] ignite git commit: Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug

Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/529dec10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/529dec10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/529dec10

Branch: refs/heads/ignite-5075-cc-debug
Commit: 529dec1018290fb9d05890150f7dd7ca410902e5
Parents: 8ee752d
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 13:21:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 13:21:12 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryEventBuffer.java        |  11 -
 .../continuous/CacheContinuousQueryHandler.java | 259 +------------------
 .../CacheContinuousQueryPartitionRecovery.java  |  48 +++-
 3 files changed, 46 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 6295b0b..59b92eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -87,17 +87,6 @@ public class CacheContinuousQueryEventBuffer {
         return null;
     }
 
-    /** */
-    private final int part;
-
-    public CacheContinuousQueryEventBuffer() {
-        part = 0;
-    }
-
-    public CacheContinuousQueryEventBuffer(int part) {
-        this.part = part;
-    }
-
     /**
      * @return Initial partition counter.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2f88827..72fdd83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -947,264 +947,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 buf = oldBuf;
         }
 
-        return buf.processEntry(e);
-    }
-
-    /**
-     *
-     */
-    private static class PartitionRecovery {
-        /** Event which means hole in sequence. */
-        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
-
-        /** */
-        private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
-
-        /** */
-        private IgniteLogger log;
-
-        /** */
-        private long lastFiredEvt;
-
-        /** */
-        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
-
-        /** */
-        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
-
-        /**
-         * @param log Logger.
-         * @param topVer Topology version.
-         * @param initCntr Update counters.
-         */
-        PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
-            this.log = log;
-
-            if (initCntr != null) {
-                assert topVer.topologyVersion() > 0 : topVer;
-
-                this.lastFiredEvt = initCntr;
-
-                curTop = topVer;
-            }
-        }
-
-        /**
-         * Resets cached topology.
-         */
-        void resetTopologyCache() {
-            curTop = AffinityTopologyVersion.NONE;
-        }
-
-        /**
-         * Add continuous entry.
-         *
-         * @param cctx Cache context.
-         * @param cache Cache.
-         * @param entry Cache continuous query entry.
-         * @return Collection entries which will be fired. This collection should contains only non-filtered events.
-         */
-        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
-            CacheContinuousQueryEntry entry,
-            GridCacheContext cctx,
-            IgniteCache cache
-        ) {
-            assert entry != null;
-
-            if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
-                assert entry.updateCounter() == 0L : entry;
-
-                return F.<CacheEntryEvent<? extends K, ? extends V>>
-                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-            }
-
-            List<CacheEntryEvent<? extends K, ? extends V>> entries;
-
-            synchronized (pendingEvts) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
-                        ", curTop=" + curTop +
-                        ", entUpdCnt=" + entry.updateCounter() +
-                        ", partId=" + entry.partition() +
-                        ", pendingEvts=" + pendingEvts + ']');
-                }
-
-                // Received first event.
-                if (curTop == AffinityTopologyVersion.NONE) {
-                    lastFiredEvt = entry.updateCounter();
-
-                    TestDebugLog.addEntryMessage(entry.partition(),
-                        entry.updateCounter(),
-                        "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
-
-                    curTop = entry.topologyVersion();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("First event [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return !entry.isFiltered() ?
-                        F.<CacheEntryEvent<? extends K, ? extends V>>
-                            asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
-                        Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
-                }
-
-                if (curTop.compareTo(entry.topologyVersion()) < 0) {
-                    if (entry.updateCounter() == 1L && !entry.isBackup()) {
-                        entries = new ArrayList<>(pendingEvts.size());
-
-                        for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
-                            if (evt != HOLE && !evt.isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
-                        }
-
-                        pendingEvts.clear();
-
-                        curTop = entry.topologyVersion();
-
-                        lastFiredEvt = entry.updateCounter();
-
-                        TestDebugLog.addEntryMessage(entry.partition(),
-                            entry.updateCounter(),
-                            "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
-
-                        if (!entry.isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
-                                ", curTop=" + curTop +
-                                ", entUpdCnt=" + entry.updateCounter() +
-                                ", partId=" + entry.partition() +
-                                ", pendingEvts=" + pendingEvts + ']');
-
-                        return entries;
-                    }
-
-                    curTop = entry.topologyVersion();
-                }
-
-                // Check duplicate.
-                if (entry.updateCounter() > lastFiredEvt) {
-                    TestDebugLog.addEntryMessage(entry.partition(),
-                        entry.updateCounter(),
-                        "add event last=" + lastFiredEvt +
-                        " cntr=" + entry.updateCounter() +
-                        " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false))  +
-                        " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false))  +
-                        " topVer=" + entry.topologyVersion());
-
-                    pendingEvts.put(entry.updateCounter(), entry);
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Skip duplicate continuous query message: " + entry);
-
-                    TestDebugLog.addEntryMessage(entry.partition(),
-                        entry.updateCounter(),
-                        "skip duplicate last=" + lastFiredEvt +
-                        " cntr=" + entry.updateCounter() +
-                        " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false))  +
-                        " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false))  +
-                        " topVer=" + entry.topologyVersion());
-
-                    return Collections.emptyList();
-                }
-
-                if (pendingEvts.isEmpty()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return Collections.emptyList();
-                }
-
-                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
-
-                entries = new ArrayList<>();
-
-                if (pendingEvts.size() >= MAX_BUFF_SIZE) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() +
-                            ", pendingEvts=" + pendingEvts + ']');
-                    }
-
-                    LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
-                        ", bufSize=" + MAX_BUFF_SIZE +
-                        ", partId=" + entry.partition() + ']');
-
-                    for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
-                        lastFiredEvt = e.getKey();
-
-                        iter.remove();
-                    }
-                }
-                else {
-                    while (iter.hasNext()) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        CacheContinuousQueryEntry pending = e.getValue();
-
-                        long filtered = pending.filteredCount();
-
-                        boolean fire = e.getKey() == lastFiredEvt + 1;;
-
-                        if (!fire && filtered > 0)
-                            fire = e.getKey() - filtered <= lastFiredEvt + 1;
-
-                        if (fire) {
-                            TestDebugLog.addEntryMessage(entry.partition(),
-                                entry.updateCounter(),
-                                "process last=" + lastFiredEvt +
-                                " cntr=" + e.getKey() +
-                                " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false))  +
-                                " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false))  +
-                                " topVer=" + e.getValue().topologyVersion() +
-                                " f=" + pending.filteredCount());
-
-                            lastFiredEvt = e.getKey();
-
-                            if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
-                            iter.remove();
-                        }
-                        else {
-                            TestDebugLog.addEntryMessage(entry.partition(),
-                                entry.updateCounter(),
-                                "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
-
-                            break;
-                        }
-                    }
-                }
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Will send to listener the following events [entries=" + entries +
-                    ", lastFiredEvt=" + lastFiredEvt +
-                    ", curTop=" + curTop +
-                    ", entUpdCnt=" + entry.updateCounter() +
-                    ", partId=" + entry.partition() +
-                    ", pendingEvts=" + pendingEvts + ']');
-            }
-
-            return entries;
-        }
+        return buf;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 534ce9c..12eaa20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -31,8 +31,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+
 /**
  *
  */
@@ -41,7 +44,7 @@ class CacheContinuousQueryPartitionRecovery {
     private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
 
     /** */
-    private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+    private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
 
     /** */
     private IgniteLogger log;
@@ -116,6 +119,10 @@ class CacheContinuousQueryPartitionRecovery {
             if (curTop == AffinityTopologyVersion.NONE) {
                 lastFiredEvt = entry.updateCounter();
 
+                TestDebugLog.addEntryMessage(entry.partition(),
+                    entry.updateCounter(),
+                    "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
+
                 curTop = entry.topologyVersion();
 
                 if (log.isDebugEnabled()) {
@@ -146,6 +153,10 @@ class CacheContinuousQueryPartitionRecovery {
 
                     lastFiredEvt = entry.updateCounter();
 
+                    TestDebugLog.addEntryMessage(entry.partition(),
+                        entry.updateCounter(),
+                        "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion());
+
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
@@ -163,12 +174,29 @@ class CacheContinuousQueryPartitionRecovery {
             }
 
             // Check duplicate.
-            if (entry.updateCounter() > lastFiredEvt)
+            if (entry.updateCounter() > lastFiredEvt) {
+                TestDebugLog.addEntryMessage(entry.partition(),
+                    entry.updateCounter(),
+                    "add event last=" + lastFiredEvt +
+                        " cntr=" + entry.updateCounter() +
+                        " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false))  +
+                        " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false))  +
+                        " topVer=" + entry.topologyVersion());
+
                 pendingEvts.put(entry.updateCounter(), entry);
+            }
             else {
                 if (log.isDebugEnabled())
                     log.debug("Skip duplicate continuous query message: " + entry);
 
+                TestDebugLog.addEntryMessage(entry.partition(),
+                    entry.updateCounter(),
+                    "skip duplicate last=" + lastFiredEvt +
+                        " cntr=" + entry.updateCounter() +
+                        " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false))  +
+                        " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false))  +
+                        " topVer=" + entry.topologyVersion());
+
                 return Collections.emptyList();
             }
 
@@ -225,6 +253,15 @@ class CacheContinuousQueryPartitionRecovery {
                         fire = e.getKey() - filtered <= lastFiredEvt + 1;
 
                     if (fire) {
+                        TestDebugLog.addEntryMessage(entry.partition(),
+                            entry.updateCounter(),
+                            "process last=" + lastFiredEvt +
+                                " cntr=" + e.getKey() +
+                                " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false))  +
+                                " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false))  +
+                                " topVer=" + e.getValue().topologyVersion() +
+                                " f=" + pending.filteredCount());
+
                         lastFiredEvt = e.getKey();
 
                         if (e.getValue() != HOLE && !e.getValue().isFiltered())
@@ -232,8 +269,13 @@ class CacheContinuousQueryPartitionRecovery {
 
                         iter.remove();
                     }
-                    else
+                    else {
+                        TestDebugLog.addEntryMessage(entry.partition(),
+                            entry.updateCounter(),
+                            "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
+
                         break;
+                    }
                 }
             }
         }