You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:05 UTC
[02/18] ignite git commit: cc
cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/460ba11a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/460ba11a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/460ba11a
Branch: refs/heads/ignite-5075-cc-debug
Commit: 460ba11adfdc49f98ba8c87c7e1390f17f720d7d
Parents: 8adfe7d
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 09:27:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 09:50:49 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 7 ++--
.../CacheContinuousQueryEventBuffer.java | 36 +++++++++++++++++++-
.../continuous/CacheContinuousQueryHandler.java | 12 +++++--
...ContinuousQueryFailoverAbstractSelfTest.java | 14 +++++---
4 files changed, 59 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 569d638..8a7cb95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1845,9 +1845,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
if (primary)
- TestDebugLog.addEntryMessage(partition(), evtVal.value(cctx.cacheObjectContext(), false), "primary notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
+ TestDebugLog.addEntryMessage(partition(), topVer, // evtVal.value(cctx.cacheObjectContext(), false)
+ "primary notify cntr=" + c.updateRes.updateCounter() +
+ " k=" + key.value(null, false));
else
- TestDebugLog.addEntryMessage(key.value(null, false), evtVal.value(cctx.cacheObjectContext(), false), "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
+ TestDebugLog.addEntryMessage(key.value(null, false), topVer,
+ "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false));
cctx.continuousQueries().onEntryUpdated(lsnrs,
key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index a308e39..b1bc7b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
import org.jetbrains.annotations.Nullable;
/**
@@ -41,6 +42,17 @@ public class CacheContinuousQueryEventBuffer {
/** */
private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>();
+ /** */
+ private final int part;
+
+ public CacheContinuousQueryEventBuffer() {
+ part = 0;
+ }
+
+ public CacheContinuousQueryEventBuffer(int part) {
+ this.part = part;
+ }
+
/**
* @return Initial partition counter.
*/
@@ -88,6 +100,12 @@ public class CacheContinuousQueryEventBuffer {
if (batch == null || cntr < batch.startCntr) {
assert entry != null : cntr;
+ TestDebugLog.addEntryMessage(part,
+ cntr,
+ "buffer rcd small start=" + batch.startCntr +
+ " cntr=" + cntr +
+ " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion());
+
return entry;
}
@@ -95,8 +113,15 @@ public class CacheContinuousQueryEventBuffer {
if (cntr <= batch.endCntr)
res = batch.processEvent0(null, cntr, entry);
- else
+ else {
+ TestDebugLog.addEntryMessage(part,
+ cntr,
+ "buffer add pending start=" + batch.startCntr +
+ " cntr=" + cntr +
+ " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion());
+
pending.put(cntr, entry);
+ }
Batch batch0 = curBatch.get();
@@ -128,6 +153,8 @@ public class CacheContinuousQueryEventBuffer {
if (curCntr == -1)
return null;
+ TestDebugLog.addEntryMessage(part, curCntr, "created batch");
+
batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
if (curBatch.compareAndSet(null, batch))
@@ -205,6 +232,13 @@ public class CacheContinuousQueryEventBuffer {
int pos = (int)(cntr - startCntr);
synchronized (this) {
+ TestDebugLog.addEntryMessage(part,
+ cntr,
+ "buffer process start=" + startCntr +
+ ", lastProc=" + lastProc +
+ " pos=" + pos +
+ " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion());
+
evts[pos] = evt;
int next = lastProc + 1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 01fe5e7..3982815 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -775,8 +775,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
- if (backupQueue0 != null)
+ if (backupQueue0 != null) {
+ TestDebugLog.addEntryMessage(entry.partition(),
+ entry.updateCounter(),
+ "add backup " +
+ " topVer=" + entry.topologyVersion());
+
backupQueue0.add(entry.forBackupQueue());
+ }
}
return notify;
@@ -948,7 +954,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (buf == null) {
final int part = e.partition();
- buf = new CacheContinuousQueryEventBuffer() {
+ buf = new CacheContinuousQueryEventBuffer(part) {
@Override protected long currentPartitionCounter() {
GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
@@ -1182,7 +1188,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean fire = e.getKey() == lastFiredEvt + 1;;
if (!fire && filtered > 0)
- fire = e.getKey() - filtered <= lastFiredEvt;
+ fire = e.getKey() - filtered <= lastFiredEvt + 1;
if (fire) {
TestDebugLog.addEntryMessage(entry.partition(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 050af5d..447a203 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1220,15 +1220,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
}
- if (!lostAllow && lostEvts.size() > 100) {
+ if (!lostAllow && lostEvts.size() > 50) {
log.error("Lost event cnt: " + lostEvts.size());
for (T3<Object, Object, Object> e : lostEvts) {
- log.error("Lost event: " + ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e);
+ log.error("Lost event: " + ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e);
- TestDebugLog.addEntryMessage(ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2());
+ TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2());
- TestDebugLog.printKeyMessages(true, ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()));
+ TestDebugLog.printKeyMessages(true, ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()));
System.exit(1);
}
@@ -1598,6 +1598,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
for (int i = 0; i < 10; i++) {
final int idx = i % (SRV_NODES - 1);
+ TestDebugLog.addMessage("Stop node: " + idx);
+
log.info("Stop node: " + idx);
stopGrid(idx);
@@ -1609,6 +1611,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
for (int j = 0; j < aff.partitions(); j++) {
Integer oldVal = (Integer)qryClnCache.get(j);
+ TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(j), i, "do put " + j + " " + i);
+
qryClnCache.put(j, i);
afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
@@ -1616,6 +1620,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
+ TestDebugLog.addMessage("Start node: " + idx);
+
log.info("Start node: " + idx);
startGrid(idx);