You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/25 15:37:18 UTC
[15/18] ignite git commit: cc
cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2942a58
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2942a58
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2942a58
Branch: refs/heads/ignite-5075-cc-debug
Commit: d2942a58971a6d3797781151283c42549f0cc8f6
Parents: 8fafde2
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 25 15:25:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 25 15:25:32 2017 +0300
----------------------------------------------------------------------
.../CacheContinuousQueryEventBuffer.java | 220 ++++++++++++++-----
.../CacheContinuousQueryPartitionRecovery.java | 19 +-
2 files changed, 184 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 59b92eb..264a6f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.spi.communication.tcp.TestDebugLog;
import org.jetbrains.annotations.Nullable;
@@ -76,15 +77,39 @@ public class CacheContinuousQueryEventBuffer {
* @return Backup entries.
*/
@Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
+ Collection<CacheContinuousQueryEntry> ret = null;
+
+ List<CacheContinuousQueryEntry> entries = null;
+
+ Batch batch = curBatch.get();
+
+ if (batch != null)
+ entries = batch.backupFlushEntries();
+
if (!backupQ.isEmpty()) {
- ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ;
+ ret = this.backupQ;
+
+ if (entries != null) {
+ for (CacheContinuousQueryEntry e : entries)
+ ((ConcurrentLinkedDeque)ret).addFirst(e);
+ }
backupQ = new ConcurrentLinkedDeque<>();
+ }
+ else
+ ret = entries;
- return ret;
+ if (ret != null) {
+ for (CacheContinuousQueryEntry e : ret)
+ TestDebugLog.addEntryMessage(part,
+ e.updateCounter(),
+ "filtered1 " + e.filteredCount() +
+ " reset backup");
}
+ else
+ TestDebugLog.addEntryMessage(part, part, "no backup");
- return null;
+ return entries;
}
/**
@@ -123,7 +148,7 @@ public class CacheContinuousQueryEventBuffer {
private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
assert cntr >= 0 : cntr;
- Batch batch = initBatch();
+ Batch batch = initBatch(entry.topologyVersion());
if (batch == null || cntr < batch.startCntr) {
assert entry != null : cntr;
@@ -173,7 +198,7 @@ public class CacheContinuousQueryEventBuffer {
/**
* @return Current batch.
*/
- @Nullable private Batch initBatch() {
+ @Nullable private Batch initBatch(AffinityTopologyVersion topVer) {
Batch batch = curBatch.get();
if (batch != null)
@@ -186,7 +211,7 @@ public class CacheContinuousQueryEventBuffer {
TestDebugLog.addEntryMessage(part, curCntr, "created batch");
- batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
+ batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
if (curBatch.compareAndSet(null, batch))
return batch;
@@ -232,28 +257,108 @@ public class CacheContinuousQueryEventBuffer {
private int lastProc = -1;
/** */
- private final Object[] evts;
+ private final CacheContinuousQueryEntry[] entries;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
/**
* @param filtered Number of filtered events before this batch.
- * @param evts Events array.
+ * @param entries Entries array.
* @param startCntr Start counter.
*/
- Batch(long startCntr, long filtered, Object[] evts) {
+ Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
assert startCntr >= 0;
assert filtered >= 0;
this.startCntr = startCntr;
this.filtered = filtered;
- this.evts = evts;
+ this.entries = entries;
+ this.topVer = topVer;
endCntr = startCntr + BUF_SIZE - 1;
}
+ synchronized List<CacheContinuousQueryEntry> backupFlushEntries() {
+ List<CacheContinuousQueryEntry> res = null;
+
+ long filtered = this.filtered;
+ long cntr = startCntr;
+
+ for (int i = 0; i < entries.length; i++) {
+ CacheContinuousQueryEntry e = entries[i];
+
+ CacheContinuousQueryEntry flushEntry = null;
+
+ if (e == null) {
+ if (filtered != 0) {
+ flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+ filtered = 0;
+ }
+ }
+ else {
+ if (e.isFiltered())
+ filtered++;
+ else {
+ flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+ e.eventType(),
+ e.key(),
+ e.value(),
+ e.oldValue(),
+ e.isKeepBinary(),
+ e.partition(),
+ e.updateCounter(),
+ e.topologyVersion());
+
+ flushEntry.filteredCount(filtered);
+
+ filtered = 0;
+ }
+ }
+
+ if (flushEntry != null) {
+ if (res == null)
+ res = new ArrayList<>();
+
+ res.add(flushEntry);
+ }
+
+ cntr++;
+ }
+
+ if (filtered != 0L) {
+ if (res == null)
+ res = new ArrayList<>();
+
+ res.add(filteredEntry(cntr - 1, filtered - 1));
+ }
+
+ return res;
+ }
+
+ private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
+ CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
+ null,
+ null,
+ null,
+ null,
+ false,
+ part,
+ cntr,
+ topVer);
+
+ e.markFiltered();
+
+ e.filteredCount(filtered);
+
+ return e;
+ }
+
/**
* @param res Current result.
- * @param cntr Event counter.
- * @param evt Event.
+ * @param cntr Entry counter.
+ * @param entry Entry.
* @param backup Backup entry flag.
* @return New result.
*/
@@ -261,7 +366,7 @@ public class CacheContinuousQueryEventBuffer {
@Nullable private Object processEvent0(
@Nullable Object res,
long cntr,
- CacheContinuousQueryEntry evt,
+ CacheContinuousQueryEntry entry,
boolean backup) {
int pos = (int)(cntr - startCntr);
@@ -271,56 +376,67 @@ public class CacheContinuousQueryEventBuffer {
"buffer process start=" + startCntr +
", lastProc=" + lastProc +
" pos=" + pos +
- " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion());
+ " topVer=" + entry.topologyVersion());
- evts[pos] = evt;
+ entries[pos] = entry;
int next = lastProc + 1;
if (next == pos) {
- for (int i = next; i < evts.length; i++) {
- Object e = evts[i];
-
- if (e != null) {
- if (e.getClass() == Long.class)
- filtered++;
- else {
- CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e;
+ for (int i = next; i < entries.length; i++) {
+ CacheContinuousQueryEntry entry0 = entries[i];
+
+ if (entry0 != null) {
+ if (!entry0.isFiltered()) {
+ TestDebugLog.addEntryMessage(part,
+ cntr,
+ "buffer process res start=" + startCntr +
+ ", lastProc=" + lastProc +
+ " pos=" + pos +
+ ", filtered=" + filtered +
+ " topVer=" + entry0.topologyVersion());
+
+ entry0.filteredCount(filtered);
+
+ filtered = 0;
+
+ if (res == null) {
+ if (backup)
+ backupQ.add(entry0);
+ else
+ res = entry0;
+ }
+ else {
+ assert !backup;
- if (!evt0.isFiltered()) {
- evt0.filteredCount(filtered);
+ List<CacheContinuousQueryEntry> resList;
- filtered = 0;
+ if (res instanceof CacheContinuousQueryEntry) {
+ resList = new ArrayList<>();
- if (res == null) {
- if (backup)
- backupQ.add(evt0);
- else
- res = evt0;
+ resList.add((CacheContinuousQueryEntry)res);
}
else {
- assert !backup;
-
- List<CacheContinuousQueryEntry> resList;
-
- if (res instanceof CacheContinuousQueryEntry) {
- resList = new ArrayList<>();
-
- resList.add((CacheContinuousQueryEntry)res);
- }
- else {
- assert res instanceof List : res;
+ assert res instanceof List : res;
- resList = (List<CacheContinuousQueryEntry>)res;
- }
+ resList = (List<CacheContinuousQueryEntry>)res;
+ }
- resList.add(evt0);
+ resList.add(entry0);
- res = resList;
- }
+ res = resList;
}
- else
- filtered++;
+ }
+ else {
+ filtered++;
+
+ TestDebugLog.addEntryMessage(part,
+ cntr,
+ "buffer process inc filtered start=" + startCntr +
+ ", lastProc=" + lastProc +
+ " pos=" + pos +
+ ", filtered=" + filtered +
+ " topVer=" + entry0.topologyVersion());
}
pos = i;
@@ -335,10 +451,10 @@ public class CacheContinuousQueryEventBuffer {
return res;
}
- if (pos == evts.length -1) {
- Arrays.fill(evts, null);
+ if (pos == entries.length -1) {
+ Arrays.fill(entries, null);
- Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, evts);
+ Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries, entry.topologyVersion());
curBatch.set(nextBatch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 12eaa20..e031428 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -41,7 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.continuous.Cache
*/
class CacheContinuousQueryPartitionRecovery {
/** Event which means hole in sequence. */
- private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+ private static final CacheContinuousQueryEntry HOLE;
+
+ static {
+ HOLE = new CacheContinuousQueryEntry();
+
+ HOLE.markFiltered();
+ }
/** */
private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
@@ -56,7 +62,7 @@ class CacheContinuousQueryPartitionRecovery {
private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
/** */
- private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+ private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
/**
* @param log Logger.
@@ -240,6 +246,8 @@ class CacheContinuousQueryPartitionRecovery {
}
}
else {
+ boolean skip = false;
+
while (iter.hasNext()) {
Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
@@ -269,7 +277,9 @@ class CacheContinuousQueryPartitionRecovery {
iter.remove();
}
- else {
+ else if (!pending.isFiltered()) {
+ skip = true;
+
TestDebugLog.addEntryMessage(entry.partition(),
entry.updateCounter(),
"stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount());
@@ -277,6 +287,9 @@ class CacheContinuousQueryPartitionRecovery {
break;
}
}
+
+ if (skip)
+ pendingEvts.headMap(lastFiredEvt).clear();
}
}