You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:05 UTC

[02/18] ignite git commit: cc

cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/460ba11a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/460ba11a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/460ba11a

Branch: refs/heads/ignite-5075-cc-debug
Commit: 460ba11adfdc49f98ba8c87c7e1390f17f720d7d
Parents: 8adfe7d
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 09:27:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 09:50:49 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  7 ++--
 .../CacheContinuousQueryEventBuffer.java        | 36 +++++++++++++++++++-
 .../continuous/CacheContinuousQueryHandler.java | 12 +++++--
 ...ContinuousQueryFailoverAbstractSelfTest.java | 14 +++++---
 4 files changed, 59 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 569d638..8a7cb95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1845,9 +1845,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
 
                 if (primary)
-                    TestDebugLog.addEntryMessage(partition(), evtVal.value(cctx.cacheObjectContext(), false), "primary notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
+                    TestDebugLog.addEntryMessage(partition(), topVer, // evtVal.value(cctx.cacheObjectContext(), false)
+                        "primary notify cntr=" + c.updateRes.updateCounter() +
+                            " k=" + key.value(null, false));
                 else
-                    TestDebugLog.addEntryMessage(key.value(null, false), evtVal.value(cctx.cacheObjectContext(), false), "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
+                    TestDebugLog.addEntryMessage(key.value(null, false), topVer,
+                        "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
 
                 cctx.continuousQueries().onEntryUpdated(lsnrs,
                     key,

http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index a308e39..b1bc7b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,6 +42,17 @@ public class CacheContinuousQueryEventBuffer {
     /** */
     private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>();
 
+    /** */
+    private final int part;
+
+    public CacheContinuousQueryEventBuffer() {
+        part = 0;
+    }
+
+    public CacheContinuousQueryEventBuffer(int part) {
+        this.part = part;
+    }
+
     /**
      * @return Initial partition counter.
      */
@@ -88,6 +100,12 @@ public class CacheContinuousQueryEventBuffer {
         if (batch == null || cntr < batch.startCntr) {
             assert entry != null : cntr;
 
+            TestDebugLog.addEntryMessage(part,
+                cntr,
+                "buffer rcd small start=" + batch.startCntr +
+                    " cntr=" + cntr +
+                    " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion());
+
             return entry;
         }
 
@@ -95,8 +113,15 @@ public class CacheContinuousQueryEventBuffer {
 
         if (cntr <= batch.endCntr)
             res = batch.processEvent0(null, cntr, entry);
-        else
+        else {
+            TestDebugLog.addEntryMessage(part,
+                cntr,
+                "buffer add pending start=" + batch.startCntr +
+                    " cntr=" + cntr +
+                    " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion());
+
             pending.put(cntr, entry);
+        }
 
         Batch batch0 = curBatch.get();
 
@@ -128,6 +153,8 @@ public class CacheContinuousQueryEventBuffer {
         if (curCntr == -1)
             return null;
 
+        TestDebugLog.addEntryMessage(part, curCntr, "created batch");
+
         batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
 
         if (curBatch.compareAndSet(null, batch))
@@ -205,6 +232,13 @@ public class CacheContinuousQueryEventBuffer {
             int pos = (int)(cntr - startCntr);
 
             synchronized (this) {
+                TestDebugLog.addEntryMessage(part,
+                    cntr,
+                    "buffer process start=" + startCntr +
+                        ", lastProc=" + lastProc +
+                        " pos=" + pos +
+                        " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion());
+
                 evts[pos] = evt;
 
                 int next = lastProc + 1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 01fe5e7..3982815 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -775,8 +775,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
             Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
 
-            if (backupQueue0 != null)
+            if (backupQueue0 != null) {
+                TestDebugLog.addEntryMessage(entry.partition(),
+                    entry.updateCounter(),
+                    "add backup " +
+                        " topVer=" + entry.topologyVersion());
+
                 backupQueue0.add(entry.forBackupQueue());
+            }
         }
 
         return notify;
@@ -948,7 +954,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (buf == null) {
             final int part = e.partition();
 
-            buf = new CacheContinuousQueryEventBuffer() {
+            buf = new CacheContinuousQueryEventBuffer(part) {
                 @Override protected long currentPartitionCounter() {
                     GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
 
@@ -1182,7 +1188,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         boolean fire = e.getKey() == lastFiredEvt + 1;;
 
                         if (!fire && filtered > 0)
-                            fire = e.getKey() - filtered <= lastFiredEvt;
+                            fire = e.getKey() - filtered <= lastFiredEvt + 1;
 
                         if (fire) {
                             TestDebugLog.addEntryMessage(entry.partition(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 050af5d..447a203 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1220,15 +1220,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 }
             }
 
-            if (!lostAllow && lostEvts.size() > 100) {
+            if (!lostAllow && lostEvts.size() > 50) {
                 log.error("Lost event cnt: " + lostEvts.size());
 
                 for (T3<Object, Object, Object> e : lostEvts) {
-                    log.error("Lost event: " + ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e);
+                    log.error("Lost event: " + ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e);
 
-                    TestDebugLog.addEntryMessage(ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2());
+                    TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2());
 
-                    TestDebugLog.printKeyMessages(true, ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()));
+                    TestDebugLog.printKeyMessages(true, ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()));
 
                     System.exit(1);
                 }
@@ -1598,6 +1598,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         for (int i = 0; i < 10; i++) {
             final int idx = i % (SRV_NODES - 1);
 
+            TestDebugLog.addMessage("Stop node: " + idx);
+
             log.info("Stop node: " + idx);
 
             stopGrid(idx);
@@ -1609,6 +1611,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             for (int j = 0; j < aff.partitions(); j++) {
                 Integer oldVal = (Integer)qryClnCache.get(j);
 
+                TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(j), i, "do put " + j + " " + i);
+
                 qryClnCache.put(j, i);
 
                 afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
@@ -1616,6 +1620,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
 
+            TestDebugLog.addMessage("Start node: " + idx);
+
             log.info("Start node: " + idx);
 
             startGrid(idx);