You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:18 UTC

[15/18] ignite git commit: cc

cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2942a58
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2942a58
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2942a58

Branch: refs/heads/ignite-5075-cc-debug
Commit: d2942a58971a6d3797781151283c42549f0cc8f6
Parents: 8fafde2
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 15:25:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 15:25:32 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryEventBuffer.java        | 220 ++++++++++++++-----
 .../CacheContinuousQueryPartitionRecovery.java  |  19 +-
 2 files changed, 184 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 59b92eb..264a6f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,15 +77,39 @@ public class CacheContinuousQueryEventBuffer {
      * @return Backup entries.
      */
     @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
+        Collection<CacheContinuousQueryEntry> ret = null;
+
+        List<CacheContinuousQueryEntry> entries = null;
+
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            entries = batch.backupFlushEntries();
+
         if (!backupQ.isEmpty()) {
-            ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ;
+            ret = this.backupQ;
+
+            if (entries != null) {
+                for (CacheContinuousQueryEntry e : entries)
+                    ((ConcurrentLinkedDeque)ret).addFirst(e);
+            }
 
             backupQ = new ConcurrentLinkedDeque<>();
+        }
+        else
+            ret = entries;
 
-            return ret;
+        if (ret != null) {
+            for (CacheContinuousQueryEntry e : ret)
+                TestDebugLog.addEntryMessage(part,
+                    e.updateCounter(),
+                    "filtered1 " + e.filteredCount() +
+                        " reset backup");
         }
+        else
+            TestDebugLog.addEntryMessage(part, part, "no backup");
 
-        return null;
+        return entries;
     }
 
     /**
@@ -123,7 +148,7 @@ public class CacheContinuousQueryEventBuffer {
     private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
         assert cntr >= 0 : cntr;
 
-        Batch batch = initBatch();
+        Batch batch = initBatch(entry.topologyVersion());
 
         if (batch == null || cntr < batch.startCntr) {
             assert entry != null : cntr;
@@ -173,7 +198,7 @@ public class CacheContinuousQueryEventBuffer {
     /**
      * @return Current batch.
      */
-    @Nullable private Batch initBatch() {
+    @Nullable private Batch initBatch(AffinityTopologyVersion topVer) {
         Batch batch = curBatch.get();
 
         if (batch != null)
@@ -186,7 +211,7 @@ public class CacheContinuousQueryEventBuffer {
 
         TestDebugLog.addEntryMessage(part, curCntr, "created batch");
 
-        batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
+        batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
 
         if (curBatch.compareAndSet(null, batch))
             return batch;
@@ -232,28 +257,108 @@ public class CacheContinuousQueryEventBuffer {
         private int lastProc = -1;
 
         /** */
-        private final Object[] evts;
+        private final CacheContinuousQueryEntry[] entries;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
 
         /**
          * @param filtered Number of filtered events before this batch.
-         * @param evts Events array.
+         * @param entries Entries array.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, Object[] evts) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
-            this.evts = evts;
+            this.entries = entries;
+            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
+        synchronized List<CacheContinuousQueryEntry> backupFlushEntries() {
+            List<CacheContinuousQueryEntry> res = null;
+
+            long filtered = this.filtered;
+            long cntr = startCntr;
+
+            for (int i = 0; i < entries.length; i++) {
+                CacheContinuousQueryEntry e = entries[i];
+
+                CacheContinuousQueryEntry flushEntry = null;
+
+                if (e == null) {
+                    if (filtered != 0) {
+                        flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+                        filtered = 0;
+                    }
+                }
+                else {
+                    if (e.isFiltered())
+                        filtered++;
+                    else {
+                        flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+                            e.eventType(),
+                            e.key(),
+                            e.value(),
+                            e.oldValue(),
+                            e.isKeepBinary(),
+                            e.partition(),
+                            e.updateCounter(),
+                            e.topologyVersion());
+
+                        flushEntry.filteredCount(filtered);
+
+                        filtered = 0;
+                    }
+                }
+
+                if (flushEntry != null) {
+                    if (res == null)
+                        res = new ArrayList<>();
+
+                    res.add(flushEntry);
+                }
+
+                cntr++;
+            }
+
+            if (filtered != 0L) {
+                if (res == null)
+                    res = new ArrayList<>();
+
+                res.add(filteredEntry(cntr - 1, filtered - 1));
+            }
+
+            return res;
+        }
+
+        private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
+            CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
+                null,
+                 null,
+                 null,
+                 null,
+                 false,
+                 part,
+                 cntr,
+                 topVer);
+
+            e.markFiltered();
+
+            e.filteredCount(filtered);
+
+            return e;
+        }
+
         /**
          * @param res Current result.
-         * @param cntr Event counter.
-         * @param evt Event.
+         * @param cntr Entry counter.
+         * @param entry Entry.
          * @param backup Backup entry flag.
          * @return New result.
          */
@@ -261,7 +366,7 @@ public class CacheContinuousQueryEventBuffer {
         @Nullable private Object processEvent0(
             @Nullable Object res,
             long cntr,
-            CacheContinuousQueryEntry evt,
+            CacheContinuousQueryEntry entry,
             boolean backup) {
             int pos = (int)(cntr - startCntr);
 
@@ -271,56 +376,67 @@ public class CacheContinuousQueryEventBuffer {
                     "buffer process start=" + startCntr +
                         ", lastProc=" + lastProc +
                         " pos=" + pos +
-                        " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion());
+                        " topVer=" + entry.topologyVersion());
 
-                evts[pos] = evt;
+                entries[pos] = entry;
 
                 int next = lastProc + 1;
 
                 if (next == pos) {
-                    for (int i = next; i < evts.length; i++) {
-                        Object e = evts[i];
-
-                        if (e != null) {
-                            if (e.getClass() == Long.class)
-                                filtered++;
-                            else {
-                                CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e;
+                    for (int i = next; i < entries.length; i++) {
+                        CacheContinuousQueryEntry entry0 = entries[i];
+
+                        if (entry0 != null) {
+                            if (!entry0.isFiltered()) {
+                                TestDebugLog.addEntryMessage(part,
+                                    cntr,
+                                    "buffer process res start=" + startCntr +
+                                        ", lastProc=" + lastProc +
+                                        " pos=" + pos +
+                                        ", filtered=" + filtered +
+                                        " topVer=" + entry0.topologyVersion());
+
+                                entry0.filteredCount(filtered);
+
+                                filtered = 0;
+
+                                if (res == null) {
+                                    if (backup)
+                                        backupQ.add(entry0);
+                                    else
+                                        res = entry0;
+                                }
+                                else {
+                                    assert !backup;
 
-                                if (!evt0.isFiltered()) {
-                                    evt0.filteredCount(filtered);
+                                    List<CacheContinuousQueryEntry> resList;
 
-                                    filtered = 0;
+                                    if (res instanceof CacheContinuousQueryEntry) {
+                                        resList = new ArrayList<>();
 
-                                    if (res == null) {
-                                        if (backup)
-                                            backupQ.add(evt0);
-                                        else
-                                            res = evt0;
+                                        resList.add((CacheContinuousQueryEntry)res);
                                     }
                                     else {
-                                        assert !backup;
-
-                                        List<CacheContinuousQueryEntry> resList;
-
-                                        if (res instanceof CacheContinuousQueryEntry) {
-                                            resList = new ArrayList<>();
-
-                                            resList.add((CacheContinuousQueryEntry)res);
-                                        }
-                                        else {
-                                            assert res instanceof List : res;
+                                        assert res instanceof List : res;
 
-                                            resList = (List<CacheContinuousQueryEntry>)res;
-                                        }
+                                        resList = (List<CacheContinuousQueryEntry>)res;
+                                    }
 
-                                        resList.add(evt0);
+                                    resList.add(entry0);
 
-                                        res = resList;
-                                    }
+                                    res = resList;
                                 }
-                                else
-                                    filtered++;
+                            }
+                            else {
+                                filtered++;
+
+                                TestDebugLog.addEntryMessage(part,
+                                    cntr,
+                                    "buffer process inc filtered start=" + startCntr +
+                                        ", lastProc=" + lastProc +
+                                        " pos=" + pos +
+                                        ", filtered=" + filtered +
+                                        " topVer=" + entry0.topologyVersion());
                             }
 
                             pos = i;
@@ -335,10 +451,10 @@ public class CacheContinuousQueryEventBuffer {
                     return res;
             }
 
-            if (pos == evts.length -1) {
-                Arrays.fill(evts, null);
+            if (pos == entries.length -1) {
+                Arrays.fill(entries, null);
 
-                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, evts);
+                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries, entry.topologyVersion());
 
                 curBatch.set(nextBatch);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 12eaa20..e031428 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -41,7 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.continuous.Cache
  */
 class CacheContinuousQueryPartitionRecovery {
     /** Event which means hole in sequence. */
-    private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+    private static final CacheContinuousQueryEntry HOLE;
+
+    static  {
+        HOLE = new CacheContinuousQueryEntry();
+
+        HOLE.markFiltered();
+    }
 
     /** */
     private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
@@ -56,7 +62,7 @@ class CacheContinuousQueryPartitionRecovery {
     private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
 
     /** */
-    private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+    private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
 
     /**
      * @param log Logger.
@@ -240,6 +246,8 @@ class CacheContinuousQueryPartitionRecovery {
                 }
             }
             else {
+                boolean skip = false;
+
                 while (iter.hasNext()) {
                     Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
@@ -269,7 +277,9 @@ class CacheContinuousQueryPartitionRecovery {
 
                         iter.remove();
                     }
-                    else {
+                    else if (!pending.isFiltered()) {
+                        skip = true;
+
                         TestDebugLog.addEntryMessage(entry.partition(),
                             entry.updateCounter(),
                             "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
@@ -277,6 +287,9 @@ class CacheContinuousQueryPartitionRecovery {
                         break;
                     }
                 }
+
+                if (skip)
+                    pendingEvts.headMap(lastFiredEvt).clear();
             }
         }