You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/02 19:03:37 UTC

ignite git commit: IGNITE-426 WIP

Repository: ignite
Updated Branches:
  refs/heads/ignite-426-2-reb 9dd18c735 -> 0834da6de


IGNITE-426 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0834da6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0834da6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0834da6d

Branch: refs/heads/ignite-426-2-reb
Commit: 0834da6ded01135a4b50a143f76892bca69059e4
Parents: 9dd18c7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Nov 2 21:03:23 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Nov 2 21:03:23 2015 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 87 ++++++++++-------
 ...acheContinuousQueryFailoverAbstractTest.java | 98 +++++++++++++++-----
 2 files changed, 128 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cb0ba5a..1df5963 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -29,12 +29,13 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -752,10 +753,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      */
     private static class HoleBuffer {
         /** */
-        private final TreeSet<Long> buf = new TreeSet<>();
+        private final NavigableSet<Long> buf = new GridConcurrentSkipListSet<>();
 
         /** */
-        private long lastFiredEvt;
+        private AtomicLong lastFiredCntr = new AtomicLong();
+
+        /**
+         * @param newVal New value.
+         * @return Old value if previous value less than new value otherwise {@code -1}.
+         */
+        private long setLastFiredCounter(long newVal) {
+            long prevVal = lastFiredCntr.get();
+
+            while (prevVal < newVal) {
+                if (lastFiredCntr.compareAndSet(prevVal, newVal))
+                    return prevVal;
+                else
+                    prevVal = lastFiredCntr.get();
+            }
+
+            return prevVal >= newVal ? -1 : prevVal;
+        }
 
         /**
          * Add continuous entry.
@@ -766,50 +784,51 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
             assert e != null;
 
-            synchronized (buf) {
-                // Handle filtered events.
-                if (e.isFiltered()) {
-                    if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1)
-                        return e;
-
+            if (e.isFiltered()) {
+                if (lastFiredCntr.get() > e.updateIndex() || e.updateIndex() == 1)
+                    return e;
+                else {
                     buf.add(e.updateIndex());
 
-                    return null;
+                    // Double check. If another thread sent a event with counter higher than this event.
+                    if (lastFiredCntr.get() > e.updateIndex() && buf.contains(e.updateIndex())) {
+                        buf.remove(e.updateIndex());
+
+                        return e;
+                    }
+                    else
+                        return null;
                 }
+            }
+            else {
+                long prevVal = setLastFiredCounter(e.updateIndex());
+
+                if (prevVal == -1)
+                    return e;
                 else {
-                    if (lastFiredEvt < e.updateIndex())
-                        lastFiredEvt = e.updateIndex();
+                    NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateIndex(), true);
 
-                    // Doesn't have filtered and delayed events.
-                    if (buf.isEmpty() || buf.first() > e.updateIndex())
-                        return e;
-                    else {
-                        GridLongList filteredEvts = new GridLongList(buf.size());
+                    GridLongList filteredEvts = new GridLongList(10);
 
-                        int size = 0;
+                    int size = 0;
 
-                        Iterator<Long> iter = buf.iterator();
+                    Iterator<Long> iter = prevHoles.iterator();
 
-                        while (iter.hasNext()) {
-                            long idx = iter.next();
+                    while (iter.hasNext()) {
+                        long idx = iter.next();
 
-                            if (idx < e.updateIndex()) {
-                                filteredEvts.add(idx);
+                        filteredEvts.add(idx);
 
-                                iter.remove();
+                        iter.remove();
 
-                                ++size;
-                            }
-                            else
-                                break;
-                        }
+                        ++size;
+                    }
 
-                        filteredEvts.truncate(size, true);
+                    filteredEvts.truncate(size, true);
 
-                        e.filteredEvents(filteredEvts);
+                    e.filteredEvents(filteredEvts);
 
-                        return e;
-                    }
+                    return e;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index b31b842..95781e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -30,9 +30,6 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
@@ -91,7 +88,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -900,17 +896,25 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
         boolean lostAllow) throws Exception {
-        boolean b = GridTestUtils.waitForCondition(new PA() {
+        GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 return expEvts.size() == lsnr.size();
             }
         }, 2000L);
 
+        Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+
+        for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+            prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+
         List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
 
         for (T3<Object, Object, Object> exp : expEvts) {
             List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
 
+            if (F.eq(exp.get2(), exp.get3()))
+                continue;
+
             if (rcvdEvts == null || rcvdEvts.isEmpty()) {
                 lostEvents.add(exp);
 
@@ -949,8 +953,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                         boolean found = false;
 
                         for (T3<Object, Object, Object> lostEvt : lostEvents) {
-                            if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())
-                                /*&& equalOldValue(e, lostEvt)*/) {
+                            if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
                                 found = true;
 
                                 lostEvents.remove(lostEvt);
@@ -972,12 +975,20 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                 for (T3<Object, Object, Object> e : lostEvents)
                     log.error("Lost event: " + e);
 
-                for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values())
-                    if (!e.isEmpty())
-                        log.error("Duplicate event: " + e);
-            }
+                for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
+                    if (!e.isEmpty()) {
+                        for (CacheEntryEvent<?, ?> event : e) {
+                            List<CacheEntryEvent<?, ?>> entries = new ArrayList<>();
 
-            assertFalse("Received duplicate events, see log for details.", !lostEvents.isEmpty());
+                            for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey())) {
+                                if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(),
+                                    ev0.getOldValue()))
+                                    entries.add(ev0);
+                            }
+                        }
+                    }
+                }
+            }
         }
 
         if (!lostAllow && !lostEvents.isEmpty()) {
@@ -1736,19 +1747,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     log.info("Stop node: " + idx);
 
+                    awaitPartitionMapExchange();
+
+                    Thread.sleep(400);
+
                     stopGrid(idx);
 
                     awaitPartitionMapExchange();
 
-                    Thread.sleep(200);
+                    Thread.sleep(400);
 
                     log.info("Start node: " + idx);
 
                     startGrid(idx);
 
-                    CountDownLatch latch = new CountDownLatch(1);
+                    Thread.sleep(200);
 
-                    awaitPartitionMapExchange();
+                    CountDownLatch latch = new CountDownLatch(1);
 
                     assertTrue(checkLatch.compareAndSet(null, latch));
 
@@ -1968,7 +1983,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final int PARTS = THREAD;
 
-        final List<T3<Object, Object, Object>> expEvts = new CopyOnWriteArrayList<>();
+        final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5);
+
+        for (int i = 0; i < THREAD; i++)
+            expEvts.add(i, new ArrayList<T3<Object, Object, Object>>());
 
         final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>();
 
@@ -2001,7 +2019,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
                         @Override public void run() {
                             try {
-                                checkEvents(expEvts, lsnr, false);
+                                GridTestUtils.waitForCondition(new PA() {
+                                    @Override public boolean apply() {
+                                        int size = 0;
+
+                                        for (List<T3<Object, Object, Object>> evt : expEvts)
+                                            size += evt.size();
+
+                                        return lsnr.size() <= size;
+                                    }
+                                }, 2000L);
+
+                                List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+                                for (List<T3<Object, Object, Object>> evt : expEvts)
+                                    expEvts0.addAll(evt);
+
+                                checkEvents(expEvts0, lsnr, false);
+
+                                for (List<T3<Object, Object, Object>> evt : expEvts)
+                                    evt.clear();
                             }
                             catch (Exception e) {
                                 log.error("Failed.", e);
@@ -2018,8 +2055,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     assertTrue(checkBarrier.compareAndSet(null, bar));
 
-                    if (stop.get() && !err)
-                        bar.await(5, SECONDS);
+                    if (!stop.get() && !err)
+                        bar.await(5, MINUTES);
                 }
 
                 return null;
@@ -2030,11 +2067,17 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final AtomicInteger valCntr = new AtomicInteger(0);
 
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        final AtomicInteger threadSeq = new AtomicInteger(0);
 
+        GridTestUtils.runMultiThreaded(new Runnable() {
             @Override public void run() {
                 try {
+                    final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    final int threadId = threadSeq.getAndIncrement();
+
+                    log.error("Thread id: " + threadId);
+
                     while (System.currentTimeMillis() < stopTime && !stop.get() && !err) {
                         Integer key = rnd.nextInt(PARTS);
 
@@ -2042,7 +2085,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                         Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
 
-                        expEvts.add(new T3<>((Object)key, (Object)val, (Object)prevVal));
+                        expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
 
                         CyclicBarrier bar = checkBarrier.get();
 
@@ -2065,7 +2108,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         restartFut.get();
 
-        checkEvents(expEvts, lsnr, true);
+        List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+        for (List<T3<Object, Object, Object>> evt : expEvts) {
+            expEvts0.addAll(evt);
+
+            evt.clear();
+        }
+
+        if (!expEvts0.isEmpty())
+            checkEvents(expEvts0, lsnr, true);
 
         cur.close();