You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/14 13:03:04 UTC

[44/50] [abbrv] ignite git commit: ignite-1.5 Fixed CacheContinuousQueryFailoverAbstractSelfTest to do not hang in case of errors.

ignite-1.5 Fixed CacheContinuousQueryFailoverAbstractSelfTest to do not hang in case of errors.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4291edca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4291edca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4291edca

Branch: refs/heads/ignite-2100
Commit: 4291edca326be4eafde331001238a9181333fbfc
Parents: 72e5b9a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 14 12:01:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 14 12:01:51 2015 +0300

----------------------------------------------------------------------
 ...ContinuousQueryFailoverAbstractSelfTest.java | 186 ++++++++++---------
 1 file changed, 100 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4291edca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 08e8adb..5a4ba14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -333,7 +333,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             List<Integer> keys = testKeys(grid(0).cache(null), 10);
 
             for (Integer key : keys) {
-                IgniteCache cache = null;
+                IgniteCache<Object, Object> cache = null;
 
                 if (rnd.nextBoolean())
                     cache = qryClient.cache(null);
@@ -462,7 +462,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             assert lsnr.evts.isEmpty();
 
-            QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+            QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry);
 
             Map<Object, T2<Object, Object>> updates = new HashMap<>();
 
@@ -505,7 +505,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             checkEvents(expEvts, lsnr, false);
 
-            query.close();
+            qryCur.close();
         }
     }
 
@@ -538,7 +538,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         IgniteCache<Object, Object> clnCache = qryClient.cache(null);
 
-        QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+        QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry);
 
         Ignite igniteSrv = ignite(0);
 
@@ -663,7 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         checkEvents(expEvts, lsnr, false);
 
-        query.close();
+        qryCur.close();
     }
 
     /**
@@ -992,8 +992,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      * @param expEvts Expected events.
      * @param lsnr Listener.
      * @param lostAllow If {@code true} than won't assert on lost events.
+     * @throws Exception If failed.
      */
-    private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
+    private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
+        final CacheEventListener2 lsnr,
         boolean lostAllow) throws Exception {
         checkEvents(expEvts, lsnr, lostAllow, true);
     }
@@ -1002,6 +1004,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      * @param expEvts Expected events.
      * @param lsnr Listener.
      * @param lostAllow If {@code true} than won't assert on lost events.
+     * @param wait Wait flag.
+     * @throws Exception If failed.
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
         boolean lostAllow, boolean wait) throws Exception {
@@ -1017,7 +1021,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
             prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
 
-        List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
+        List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
 
         for (T3<Object, Object, Object> exp : expEvts) {
             List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
@@ -1026,7 +1030,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 continue;
 
             if (rcvdEvts == null || rcvdEvts.isEmpty()) {
-                lostEvents.add(exp);
+                lostEvts.add(exp);
 
                 continue;
             }
@@ -1050,7 +1054,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             // Lost event is acceptable.
             if (!found)
-                lostEvents.add(exp);
+                lostEvts.add(exp);
         }
 
         boolean dup = false;
@@ -1062,11 +1066,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                     for (CacheEntryEvent<?, ?> e : evts) {
                         boolean found = false;
 
-                        for (T3<Object, Object, Object> lostEvt : lostEvents) {
+                        for (T3<Object, Object, Object> lostEvt : lostEvts) {
                             if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
                                 found = true;
 
-                                lostEvents.remove(lostEvt);
+                                lostEvts.remove(lostEvt);
 
                                 break;
                             }
@@ -1091,16 +1095,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             }
         }
 
-        if (!lostAllow && lostEvents.size() > 100) {
-            log.error("Lost event cnt: " + lostEvents.size());
+        if (!lostAllow && lostEvts.size() > 100) {
+            log.error("Lost event cnt: " + lostEvts.size());
 
-            for (T3<Object, Object, Object> e : lostEvents)
+            for (T3<Object, Object, Object> e : lostEvts)
                 log.error("Lost event: " + e);
 
             fail("Lose events, see log for details.");
         }
 
-        log.error("Lost event cnt: " + lostEvents.size());
+        log.error("Lost event cnt: " + lostEvts.size());
 
         expEvts.clear();
 
@@ -1126,8 +1130,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      * @param lsnr Listener.
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
-        boolean allowLoseEvent) throws Exception {
-        if (!allowLoseEvent)
+        boolean allowLoseEvt) throws Exception {
+        if (!allowLoseEvt)
             assert GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {
                     return lsnr.evts.size() == expEvts.size();
@@ -1140,11 +1144,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             assertNotNull("No event for key: " + exp.get1(), e);
             assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
 
-            if (allowLoseEvent)
+            if (allowLoseEvt)
                 lsnr.evts.remove(exp.get1());
         }
 
-        if (allowLoseEvent)
+        if (allowLoseEvt)
             assert lsnr.evts.isEmpty();
 
         expEvts.clear();
@@ -1385,17 +1389,17 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             awaitPartitionMapExchange();
 
-            List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+            List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>();
 
             for (int j = 0; j < aff.partitions(); j++) {
                 Integer oldVal = (Integer)qryClnCache.get(j);
 
                 qryClnCache.put(j, i);
 
-                afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
+                afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
             }
 
-            checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+            checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
 
             log.info("Start node: " + idx);
 
@@ -1406,9 +1410,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * @param backups Number of backups.
      * @throws Exception If failed.
      */
-    public void failoverStartStopFilter(int backups) throws Exception {
+    private void failoverStartStopFilter(int backups) throws Exception {
         this.backups = backups;
 
         final int SRV_NODES = 4;
@@ -1629,22 +1634,22 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             dinLsnr.vals.clear();
         }
 
-        List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+        List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>();
 
         for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
             Integer oldVal = (Integer)qryClnCache.get(i);
 
             qryClnCache.put(i, i);
 
-            afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
+            afterRestEvts.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
         }
 
-        checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+        checkEvents(new ArrayList<>(afterRestEvts), lsnr, false);
 
         cur.close();
 
         if (dinQry != null) {
-            checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+            checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false);
 
             dinQry.close();
         }
@@ -1695,81 +1700,90 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                while (!stop.get() && !err) {
-                    final int idx = rnd.nextInt(SRV_NODES);
-
-                    log.info("Stop node: " + idx);
-
-                    stopGrid(idx);
-
-                    Thread.sleep(300);
-
-                    GridTestUtils.waitForCondition(new PA() {
-                        @Override public boolean apply() {
-                            return qryCln.cluster().nodes().size() == SRV_NODES;
-                        }
-                    }, 5000L);
+                try {
+                    while (!stop.get() && !err) {
+                        final int idx = rnd.nextInt(SRV_NODES);
 
-                    try {
-                        log.info("Start node: " + idx);
+                        log.info("Stop node: " + idx);
 
-                        startGrid(idx);
+                        stopGrid(idx);
 
                         Thread.sleep(300);
 
                         GridTestUtils.waitForCondition(new PA() {
                             @Override public boolean apply() {
-                                return qryCln.cluster().nodes().size() == SRV_NODES + 1;
+                                return qryCln.cluster().nodes().size() == SRV_NODES;
                             }
                         }, 5000L);
-                    }
-                    catch (Exception e) {
-                        log.warning("Failed to stop nodes.", e);
-                    }
 
-                    CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
-                        @Override public void run() {
-                            try {
-                                int size0 = 0;
+                        try {
+                            log.info("Start node: " + idx);
 
-                                for (List<T3<Object, Object, Object>> evt : expEvts)
-                                    size0 += evt.size();
+                            startGrid(idx);
 
-                                final int size = size0;
+                            Thread.sleep(300);
 
-                                GridTestUtils.waitForCondition(new PA() {
-                                    @Override public boolean apply() {
-                                        return lsnr.size() <= size;
-                                    }
-                                }, 2000L);
+                            GridTestUtils.waitForCondition(new PA() {
+                                @Override public boolean apply() {
+                                    return qryCln.cluster().nodes().size() == SRV_NODES + 1;
+                                }
+                            }, 5000L);
+                        }
+                        catch (Exception e) {
+                            log.warning("Failed to stop nodes.", e);
+                        }
 
-                                List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+                        CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    int size0 = 0;
 
-                                for (List<T3<Object, Object, Object>> evt : expEvts)
-                                    expEvts0.addAll(evt);
+                                    for (List<T3<Object, Object, Object>> evt : expEvts)
+                                        size0 += evt.size();
 
-                                checkEvents(expEvts0, lsnr, false, false);
+                                    final int size = size0;
 
-                                for (List<T3<Object, Object, Object>> evt : expEvts)
-                                    evt.clear();
-                            }
-                            catch (Exception e) {
-                                log.error("Failed.", e);
+                                    GridTestUtils.waitForCondition(new PA() {
+                                        @Override public boolean apply() {
+                                            return lsnr.size() <= size;
+                                        }
+                                    }, 2000L);
 
-                                err = true;
+                                    List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
 
-                                stop.set(true);
-                            }
-                            finally {
-                                checkBarrier.set(null);
+                                    for (List<T3<Object, Object, Object>> evt : expEvts)
+                                        expEvts0.addAll(evt);
+
+                                    checkEvents(expEvts0, lsnr, false, false);
+
+                                    for (List<T3<Object, Object, Object>> evt : expEvts)
+                                        evt.clear();
+                                }
+                                catch (Exception e) {
+                                    log.error("Failed.", e);
+
+                                    err = true;
+
+                                    stop.set(true);
+                                }
+                                finally {
+                                    checkBarrier.set(null);
+                                }
                             }
-                        }
-                    });
+                        });
 
-                    assertTrue(checkBarrier.compareAndSet(null, bar));
+                        assertTrue(checkBarrier.compareAndSet(null, bar));
+
+                        if (!stop.get() && !err)
+                            bar.await(1, MINUTES);
+                    }
+                }
+                catch (Throwable e) {
+                    log.error("Unexpected error: " + e, e);
+
+                    err = true;
 
-                    if (!stop.get() && !err)
-                        bar.await(1, MINUTES);
+                    throw e;
                 }
 
                 return null;
@@ -1803,7 +1817,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                         CyclicBarrier bar = checkBarrier.get();
 
                         if (bar != null)
-                            bar.await();
+                            bar.await(1, MINUTES);
                     }
                 }
                 catch (Exception e){
@@ -2159,13 +2173,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
-            for (CacheEntryEvent<?, ?> e : events) {
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException {
+            for (CacheEntryEvent<?, ?> e : evts) {
                 Integer key = (Integer)e.getKey();
 
                 keys.add(key);
 
-                assert evts.put(key, e) == null;
+                assert this.evts.put(key, e) == null;
             }
         }
 
@@ -2180,8 +2194,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      */
     public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
-            return ((Integer)event.getValue()) >= 0;
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+            return ((Integer)evt.getValue()) >= 0;
         }
     }