You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/14 13:03:04 UTC
[44/50] [abbrv] ignite git commit: ignite-1.5 Fixed
CacheContinuousQueryFailoverAbstractSelfTest to do not hang in case of
errors.
ignite-1.5 Fixed CacheContinuousQueryFailoverAbstractSelfTest to do not hang in case of errors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4291edca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4291edca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4291edca
Branch: refs/heads/ignite-2100
Commit: 4291edca326be4eafde331001238a9181333fbfc
Parents: 72e5b9a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 14 12:01:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 14 12:01:51 2015 +0300
----------------------------------------------------------------------
...ContinuousQueryFailoverAbstractSelfTest.java | 186 ++++++++++---------
1 file changed, 100 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4291edca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 08e8adb..5a4ba14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -333,7 +333,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
List<Integer> keys = testKeys(grid(0).cache(null), 10);
for (Integer key : keys) {
- IgniteCache cache = null;
+ IgniteCache<Object, Object> cache = null;
if (rnd.nextBoolean())
cache = qryClient.cache(null);
@@ -462,7 +462,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
assert lsnr.evts.isEmpty();
- QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+ QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry);
Map<Object, T2<Object, Object>> updates = new HashMap<>();
@@ -505,7 +505,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
checkEvents(expEvts, lsnr, false);
- query.close();
+ qryCur.close();
}
}
@@ -538,7 +538,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
IgniteCache<Object, Object> clnCache = qryClient.cache(null);
- QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+ QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry);
Ignite igniteSrv = ignite(0);
@@ -663,7 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
checkEvents(expEvts, lsnr, false);
- query.close();
+ qryCur.close();
}
/**
@@ -992,8 +992,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
* @param expEvts Expected events.
* @param lsnr Listener.
* @param lostAllow If {@code true} than won't assert on lost events.
+ * @throws Exception If failed.
*/
- private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
+ private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
+ final CacheEventListener2 lsnr,
boolean lostAllow) throws Exception {
checkEvents(expEvts, lsnr, lostAllow, true);
}
@@ -1002,6 +1004,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
* @param expEvts Expected events.
* @param lsnr Listener.
* @param lostAllow If {@code true} than won't assert on lost events.
+ * @param wait Wait flag.
+ * @throws Exception If failed.
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
boolean lostAllow, boolean wait) throws Exception {
@@ -1017,7 +1021,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
- List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
+ List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
for (T3<Object, Object, Object> exp : expEvts) {
List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
@@ -1026,7 +1030,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
continue;
if (rcvdEvts == null || rcvdEvts.isEmpty()) {
- lostEvents.add(exp);
+ lostEvts.add(exp);
continue;
}
@@ -1050,7 +1054,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
// Lost event is acceptable.
if (!found)
- lostEvents.add(exp);
+ lostEvts.add(exp);
}
boolean dup = false;
@@ -1062,11 +1066,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
for (CacheEntryEvent<?, ?> e : evts) {
boolean found = false;
- for (T3<Object, Object, Object> lostEvt : lostEvents) {
+ for (T3<Object, Object, Object> lostEvt : lostEvts) {
if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
found = true;
- lostEvents.remove(lostEvt);
+ lostEvts.remove(lostEvt);
break;
}
@@ -1091,16 +1095,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
}
- if (!lostAllow && lostEvents.size() > 100) {
- log.error("Lost event cnt: " + lostEvents.size());
+ if (!lostAllow && lostEvts.size() > 100) {
+ log.error("Lost event cnt: " + lostEvts.size());
- for (T3<Object, Object, Object> e : lostEvents)
+ for (T3<Object, Object, Object> e : lostEvts)
log.error("Lost event: " + e);
fail("Lose events, see log for details.");
}
- log.error("Lost event cnt: " + lostEvents.size());
+ log.error("Lost event cnt: " + lostEvts.size());
expEvts.clear();
@@ -1126,8 +1130,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
* @param lsnr Listener.
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
- boolean allowLoseEvent) throws Exception {
- if (!allowLoseEvent)
+ boolean allowLoseEvt) throws Exception {
+ if (!allowLoseEvt)
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return lsnr.evts.size() == expEvts.size();
@@ -1140,11 +1144,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
assertNotNull("No event for key: " + exp.get1(), e);
assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
- if (allowLoseEvent)
+ if (allowLoseEvt)
lsnr.evts.remove(exp.get1());
}
- if (allowLoseEvent)
+ if (allowLoseEvt)
assert lsnr.evts.isEmpty();
expEvts.clear();
@@ -1385,17 +1389,17 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
awaitPartitionMapExchange();
- List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+ List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>();
for (int j = 0; j < aff.partitions(); j++) {
Integer oldVal = (Integer)qryClnCache.get(j);
qryClnCache.put(j, i);
- afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
+ afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
}
- checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+ checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
log.info("Start node: " + idx);
@@ -1406,9 +1410,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
/**
+ * @param backups Number of backups.
* @throws Exception If failed.
*/
- public void failoverStartStopFilter(int backups) throws Exception {
+ private void failoverStartStopFilter(int backups) throws Exception {
this.backups = backups;
final int SRV_NODES = 4;
@@ -1629,22 +1634,22 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
dinLsnr.vals.clear();
}
- List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+ List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>();
for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
Integer oldVal = (Integer)qryClnCache.get(i);
qryClnCache.put(i, i);
- afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
+ afterRestEvts.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
}
- checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+ checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
cur.close();
if (dinQry != null) {
- checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+ checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false);
dinQry.close();
}
@@ -1695,81 +1700,90 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- while (!stop.get() && !err) {
- final int idx = rnd.nextInt(SRV_NODES);
-
- log.info("Stop node: " + idx);
-
- stopGrid(idx);
-
- Thread.sleep(300);
-
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return qryCln.cluster().nodes().size() == SRV_NODES;
- }
- }, 5000L);
+ try {
+ while (!stop.get() && !err) {
+ final int idx = rnd.nextInt(SRV_NODES);
- try {
- log.info("Start node: " + idx);
+ log.info("Stop node: " + idx);
- startGrid(idx);
+ stopGrid(idx);
Thread.sleep(300);
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return qryCln.cluster().nodes().size() == SRV_NODES + 1;
+ return qryCln.cluster().nodes().size() == SRV_NODES;
}
}, 5000L);
- }
- catch (Exception e) {
- log.warning("Failed to stop nodes.", e);
- }
- CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
- @Override public void run() {
- try {
- int size0 = 0;
+ try {
+ log.info("Start node: " + idx);
- for (List<T3<Object, Object, Object>> evt : expEvts)
- size0 += evt.size();
+ startGrid(idx);
- final int size = size0;
+ Thread.sleep(300);
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return lsnr.size() <= size;
- }
- }, 2000L);
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return qryCln.cluster().nodes().size() == SRV_NODES + 1;
+ }
+ }, 5000L);
+ }
+ catch (Exception e) {
+ log.warning("Failed to stop nodes.", e);
+ }
- List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+ CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
+ @Override public void run() {
+ try {
+ int size0 = 0;
- for (List<T3<Object, Object, Object>> evt : expEvts)
- expEvts0.addAll(evt);
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ size0 += evt.size();
- checkEvents(expEvts0, lsnr, false, false);
+ final int size = size0;
- for (List<T3<Object, Object, Object>> evt : expEvts)
- evt.clear();
- }
- catch (Exception e) {
- log.error("Failed.", e);
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr.size() <= size;
+ }
+ }, 2000L);
- err = true;
+ List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
- stop.set(true);
- }
- finally {
- checkBarrier.set(null);
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ expEvts0.addAll(evt);
+
+ checkEvents(expEvts0, lsnr, false, false);
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ evt.clear();
+ }
+ catch (Exception e) {
+ log.error("Failed.", e);
+
+ err = true;
+
+ stop.set(true);
+ }
+ finally {
+ checkBarrier.set(null);
+ }
}
- }
- });
+ });
- assertTrue(checkBarrier.compareAndSet(null, bar));
+ assertTrue(checkBarrier.compareAndSet(null, bar));
+
+ if (!stop.get() && !err)
+ bar.await(1, MINUTES);
+ }
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ err = true;
- if (!stop.get() && !err)
- bar.await(1, MINUTES);
+ throw e;
}
return null;
@@ -1803,7 +1817,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
CyclicBarrier bar = checkBarrier.get();
if (bar != null)
- bar.await();
+ bar.await(1, MINUTES);
}
}
catch (Exception e){
@@ -2159,13 +2173,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
- for (CacheEntryEvent<?, ?> e : events) {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> e : evts) {
Integer key = (Integer)e.getKey();
keys.add(key);
- assert evts.put(key, e) == null;
+ assert this.evts.put(key, e) == null;
}
}
@@ -2180,8 +2194,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
*/
public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
- return ((Integer)event.getValue()) >= 0;
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ return ((Integer)evt.getValue()) >= 0;
}
}