You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/02 19:03:37 UTC
ignite git commit: IGNITE-426 WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-426-2-reb 9dd18c735 -> 0834da6de
IGNITE-426 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0834da6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0834da6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0834da6d
Branch: refs/heads/ignite-426-2-reb
Commit: 0834da6ded01135a4b50a143f76892bca69059e4
Parents: 9dd18c7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Nov 2 21:03:23 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Nov 2 21:03:23 2015 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 87 ++++++++++-------
...acheContinuousQueryFailoverAbstractTest.java | 98 +++++++++++++++-----
2 files changed, 128 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cb0ba5a..1df5963 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -29,12 +29,13 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -752,10 +753,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
*/
private static class HoleBuffer {
/** */
- private final TreeSet<Long> buf = new TreeSet<>();
+ private final NavigableSet<Long> buf = new GridConcurrentSkipListSet<>();
/** */
- private long lastFiredEvt;
+ private AtomicLong lastFiredCntr = new AtomicLong();
+
+ /**
+ * @param newVal New value.
+ * @return Old value if previous value less than new value otherwise {@code -1}.
+ */
+ private long setLastFiredCounter(long newVal) {
+ long prevVal = lastFiredCntr.get();
+
+ while (prevVal < newVal) {
+ if (lastFiredCntr.compareAndSet(prevVal, newVal))
+ return prevVal;
+ else
+ prevVal = lastFiredCntr.get();
+ }
+
+ return prevVal >= newVal ? -1 : prevVal;
+ }
/**
* Add continuous entry.
@@ -766,50 +784,51 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
assert e != null;
- synchronized (buf) {
- // Handle filtered events.
- if (e.isFiltered()) {
- if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1)
- return e;
-
+ if (e.isFiltered()) {
+ if (lastFiredCntr.get() > e.updateIndex() || e.updateIndex() == 1)
+ return e;
+ else {
buf.add(e.updateIndex());
- return null;
+ // Double check. If another thread sent a event with counter higher than this event.
+ if (lastFiredCntr.get() > e.updateIndex() && buf.contains(e.updateIndex())) {
+ buf.remove(e.updateIndex());
+
+ return e;
+ }
+ else
+ return null;
}
+ }
+ else {
+ long prevVal = setLastFiredCounter(e.updateIndex());
+
+ if (prevVal == -1)
+ return e;
else {
- if (lastFiredEvt < e.updateIndex())
- lastFiredEvt = e.updateIndex();
+ NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateIndex(), true);
- // Doesn't have filtered and delayed events.
- if (buf.isEmpty() || buf.first() > e.updateIndex())
- return e;
- else {
- GridLongList filteredEvts = new GridLongList(buf.size());
+ GridLongList filteredEvts = new GridLongList(10);
- int size = 0;
+ int size = 0;
- Iterator<Long> iter = buf.iterator();
+ Iterator<Long> iter = prevHoles.iterator();
- while (iter.hasNext()) {
- long idx = iter.next();
+ while (iter.hasNext()) {
+ long idx = iter.next();
- if (idx < e.updateIndex()) {
- filteredEvts.add(idx);
+ filteredEvts.add(idx);
- iter.remove();
+ iter.remove();
- ++size;
- }
- else
- break;
- }
+ ++size;
+ }
- filteredEvts.truncate(size, true);
+ filteredEvts.truncate(size, true);
- e.filteredEvents(filteredEvts);
+ e.filteredEvents(filteredEvts);
- return e;
- }
+ return e;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index b31b842..95781e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -30,9 +30,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -91,7 +88,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
-import org.eclipse.jetty.util.ConcurrentHashSet;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -900,17 +896,25 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
boolean lostAllow) throws Exception {
- boolean b = GridTestUtils.waitForCondition(new PA() {
+ GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return expEvts.size() == lsnr.size();
}
}, 2000L);
+ Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+
+ for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+ prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+
List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
for (T3<Object, Object, Object> exp : expEvts) {
List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+ if (F.eq(exp.get2(), exp.get3()))
+ continue;
+
if (rcvdEvts == null || rcvdEvts.isEmpty()) {
lostEvents.add(exp);
@@ -949,8 +953,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
boolean found = false;
for (T3<Object, Object, Object> lostEvt : lostEvents) {
- if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())
- /*&& equalOldValue(e, lostEvt)*/) {
+ if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
found = true;
lostEvents.remove(lostEvt);
@@ -972,12 +975,20 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
for (T3<Object, Object, Object> e : lostEvents)
log.error("Lost event: " + e);
- for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values())
- if (!e.isEmpty())
- log.error("Duplicate event: " + e);
- }
+ for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
+ if (!e.isEmpty()) {
+ for (CacheEntryEvent<?, ?> event : e) {
+ List<CacheEntryEvent<?, ?>> entries = new ArrayList<>();
- assertFalse("Received duplicate events, see log for details.", !lostEvents.isEmpty());
+ for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey())) {
+ if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(),
+ ev0.getOldValue()))
+ entries.add(ev0);
+ }
+ }
+ }
+ }
+ }
}
if (!lostAllow && !lostEvents.isEmpty()) {
@@ -1736,19 +1747,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
log.info("Stop node: " + idx);
+ awaitPartitionMapExchange();
+
+ Thread.sleep(400);
+
stopGrid(idx);
awaitPartitionMapExchange();
- Thread.sleep(200);
+ Thread.sleep(400);
log.info("Start node: " + idx);
startGrid(idx);
- CountDownLatch latch = new CountDownLatch(1);
+ Thread.sleep(200);
- awaitPartitionMapExchange();
+ CountDownLatch latch = new CountDownLatch(1);
assertTrue(checkLatch.compareAndSet(null, latch));
@@ -1968,7 +1983,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final int PARTS = THREAD;
- final List<T3<Object, Object, Object>> expEvts = new CopyOnWriteArrayList<>();
+ final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5);
+
+ for (int i = 0; i < THREAD; i++)
+ expEvts.add(i, new ArrayList<T3<Object, Object, Object>>());
final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>();
@@ -2001,7 +2019,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
@Override public void run() {
try {
- checkEvents(expEvts, lsnr, false);
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ int size = 0;
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ size += evt.size();
+
+ return lsnr.size() <= size;
+ }
+ }, 2000L);
+
+ List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ expEvts0.addAll(evt);
+
+ checkEvents(expEvts0, lsnr, false);
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ evt.clear();
}
catch (Exception e) {
log.error("Failed.", e);
@@ -2018,8 +2055,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
assertTrue(checkBarrier.compareAndSet(null, bar));
- if (stop.get() && !err)
- bar.await(5, SECONDS);
+ if (!stop.get() && !err)
+ bar.await(5, MINUTES);
}
return null;
@@ -2030,11 +2067,17 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final AtomicInteger valCntr = new AtomicInteger(0);
- GridTestUtils.runMultiThreaded(new Runnable() {
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ final AtomicInteger threadSeq = new AtomicInteger(0);
+ GridTestUtils.runMultiThreaded(new Runnable() {
@Override public void run() {
try {
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ final int threadId = threadSeq.getAndIncrement();
+
+ log.error("Thread id: " + threadId);
+
while (System.currentTimeMillis() < stopTime && !stop.get() && !err) {
Integer key = rnd.nextInt(PARTS);
@@ -2042,7 +2085,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
- expEvts.add(new T3<>((Object)key, (Object)val, (Object)prevVal));
+ expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
CyclicBarrier bar = checkBarrier.get();
@@ -2065,7 +2108,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
restartFut.get();
- checkEvents(expEvts, lsnr, true);
+ List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+ for (List<T3<Object, Object, Object>> evt : expEvts) {
+ expEvts0.addAll(evt);
+
+ evt.clear();
+ }
+
+ if (!expEvts0.isEmpty())
+ checkEvents(expEvts0, lsnr, true);
cur.close();