You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Nikolay Izhikov (JIRA)" <ji...@apache.org> on 2018/03/28 16:03:00 UTC

[jira] [Comment Edited] (IGNITE-8035) Duplicated events with type CREATED in ContinuousQuery's Events Listener

    [ https://issues.apache.org/jira/browse/IGNITE-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417629#comment-16417629 ] 

Nikolay Izhikov edited comment on IGNITE-8035 at 3/28/18 4:02 PM:
------------------------------------------------------------------

Hello, [~ruslangm]

I wrote test to try to reproduce for your issue.
But I can't reproduce your issue.

Seems like you misused Continuous Query API.
If you want to get Ignite instance for a remote filter you should use @IgniteResouceInstance annotation.
Please, take a look to RemoteFactory in test.

{code:java}
package org.apache.ignite.internal.processors.cache.query.continuous;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;

public class GridCacheContinuousQueryDuplicateEventsTest extends GridCommonAbstractTest implements Serializable {
    /** IP finder. */
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

    /** */
    private static final long TIMEOUT = 60_000L;

    public static final long DATA_AMOUNT = 10_000L;

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration cacheCfg = defaultCacheConfiguration();

        cacheCfg.setCacheMode(PARTITIONED);
        cacheCfg.setAtomicityMode(ATOMIC);
        cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
        cacheCfg.setBackups(2);

        cfg.setCacheConfiguration(cacheCfg);

        TcpDiscoverySpi disco = new TcpDiscoverySpi();

        disco.setIpFinder(IP_FINDER);

        cfg.setDiscoverySpi(disco);

        //((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        startGridsMultiThreaded(2);
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        stopAllGrids();
    }

    /**
     * @throws Exception If failed.
     */
    public void testDuplicateEvents() throws Exception {
        Ignite grid0 = grid(0);
        
        ClusterNode node0 = grid0.cluster().localNode();
        
        Ignite grid1 = grid(1);
        
        ClusterNode node1 = grid1.cluster().localNode();

        IgniteCache<String, Long> cache = grid0.getOrCreateCache(DEFAULT_CACHE_NAME);

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        
        EventListener lsnr0 = new EventListener();

        qry0.setLocalListener(lsnr0)
            .setRemoteFilterFactory(new RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();

        EventListener lsnr1 = new EventListener();

        qry1.setLocalListener(lsnr1)
            .setRemoteFilterFactory(new RemoteFactory(node1));

        try(QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0); 
            QueryCursor<Cache.Entry<String, Long>> cursor1 = cache.query(qry1)) {
            
            for (long i=0; i<DATA_AMOUNT; i++) {
                cache.put("" + i, i);

                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return lsnr0.evtsCnt() + lsnr1.evtsCnt() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue( "All events are received by listener", allEvtsRcvd);
            
            assertEquals("All events are received by listener", DATA_AMOUNT, lsnr0.evtsCnt() + lsnr1.evtsCnt());
            
            stopGrid(0);
            
            Thread.sleep(10_000L);
            
            assertEquals("No new events after stop grid", DATA_AMOUNT, lsnr0.evtsCnt() + lsnr1.evtsCnt());
        }
    }

    @IgniteAsyncCallback
    public static class EventListener implements CacheEntryUpdatedListener<String, Long> {
        private AtomicLong evtsCnt = new AtomicLong();
        
        @Override
        public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
                if (event.getEventType() == EventType.CREATED) {
                    evtsCnt.incrementAndGet();
                }
            }
        }

        public long evtsCnt() {
            return evtsCnt.get();
        }
    }

    @IgniteAsyncCallback
    public class RemoteFactory implements Factory<CacheEntryEventFilter<String, Long>> {
        private final ClusterNode node;

        public RemoteFactory(ClusterNode node) {
            this.node = node;
        }

        @Override
        public CacheEntryEventFilter<String, Long> create() {
            return new CacheEntryEventFilter<String, Long>() {
                @IgniteInstanceResource private Ignite ignite;

                @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
                    return node.id().equals(ignite.cluster().localNode().id());
                }
            };
        }
    }
}
{code}


was (Author: nizhikov):
Hello, [~ruslangm]

I wrote reproducer for your issue.

Seems like you misused Continuous Query API.
If you want to get Ignite instance for a remote filter you should use @IgniteResouceInstance annotation.
Please, take a look to RemoteFactory in test.

{code:java}
package org.apache.ignite.internal.processors.cache.query.continuous;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;

public class GridCacheContinuousQueryDuplicateEventsTest extends GridCommonAbstractTest implements Serializable {
    /** IP finder. */
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

    /** */
    private static final long TIMEOUT = 60_000L;

    public static final long DATA_AMOUNT = 10_000L;

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration cacheCfg = defaultCacheConfiguration();

        cacheCfg.setCacheMode(PARTITIONED);
        cacheCfg.setAtomicityMode(ATOMIC);
        cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
        cacheCfg.setBackups(2);

        cfg.setCacheConfiguration(cacheCfg);

        TcpDiscoverySpi disco = new TcpDiscoverySpi();

        disco.setIpFinder(IP_FINDER);

        cfg.setDiscoverySpi(disco);

        //((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        startGridsMultiThreaded(2);
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        stopAllGrids();
    }

    /**
     * @throws Exception If failed.
     */
    public void testDuplicateEvents() throws Exception {
        Ignite grid0 = grid(0);
        
        ClusterNode node0 = grid0.cluster().localNode();
        
        Ignite grid1 = grid(1);
        
        ClusterNode node1 = grid1.cluster().localNode();

        IgniteCache<String, Long> cache = grid0.getOrCreateCache(DEFAULT_CACHE_NAME);

        ContinuousQuery<String, Long> qry0 = new ContinuousQuery<>();
        
        EventListener lsnr0 = new EventListener();

        qry0.setLocalListener(lsnr0)
            .setRemoteFilterFactory(new RemoteFactory(node0));

        ContinuousQuery<String, Long> qry1 = new ContinuousQuery<>();

        EventListener lsnr1 = new EventListener();

        qry1.setLocalListener(lsnr1)
            .setRemoteFilterFactory(new RemoteFactory(node1));

        try(QueryCursor<Cache.Entry<String, Long>> cursor0 = cache.query(qry0); 
            QueryCursor<Cache.Entry<String, Long>> cursor1 = cache.query(qry1)) {
            
            for (long i=0; i<DATA_AMOUNT; i++) {
                cache.put("" + i, i);

                cache.remove("" + i, i);
            }

            boolean allEvtsRcvd = GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return lsnr0.evtsCnt() + lsnr1.evtsCnt() >= DATA_AMOUNT;
                }
            }, TIMEOUT);

            assertTrue( "All events are received by listener", allEvtsRcvd);
            
            assertEquals("All events are received by listener", DATA_AMOUNT, lsnr0.evtsCnt() + lsnr1.evtsCnt());
            
            stopGrid(0);
            
            Thread.sleep(10_000L);
            
            assertEquals("No new events after stop grid", DATA_AMOUNT, lsnr0.evtsCnt() + lsnr1.evtsCnt());
        }
    }

    @IgniteAsyncCallback
    public static class EventListener implements CacheEntryUpdatedListener<String, Long> {
        private AtomicLong evtsCnt = new AtomicLong();
        
        @Override
        public void onUpdated(
            Iterable<CacheEntryEvent<? extends String, ? extends Long>> events) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends String, ? extends Long> event : events) {
                if (event.getEventType() == EventType.CREATED) {
                    evtsCnt.incrementAndGet();
                }
            }
        }

        public long evtsCnt() {
            return evtsCnt.get();
        }
    }

    @IgniteAsyncCallback
    public class RemoteFactory implements Factory<CacheEntryEventFilter<String, Long>> {
        private final ClusterNode node;

        public RemoteFactory(ClusterNode node) {
            this.node = node;
        }

        @Override
        public CacheEntryEventFilter<String, Long> create() {
            return new CacheEntryEventFilter<String, Long>() {
                @IgniteInstanceResource private Ignite ignite;

                @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
                    return node.id().equals(ignite.cluster().localNode().id());
                }
            };
        }
    }
}
{code}

> Duplicated events with type CREATED in ContinuousQuery's Events Listener 
> -------------------------------------------------------------------------
>
>                 Key: IGNITE-8035
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8035
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>    Affects Versions: 2.4
>            Reporter: Ruslan Gilemzyanov
>            Assignee: Nikolay Izhikov
>            Priority: Major
>
> We faced with bug in ContinuousQuery's EventListener work in Ignite. I wrote sample project to demonstrate it.
> We started 2 server nodes connected to the one cache.
> Topology snapshot became [ver=2, servers=2, clients=0, CPUs=4, heap=3.6GB]
> I have put elements in cache (about 50 elements). Elements were distributed between two nodes approxiamtely in the same amount.
> After pushing every element to cache we waited 100ms (to ensure that Listener did his work) and deleted element from cache. 
> Then we stopped one node. (Topology snapshot became [ver=3, servers=1, clients=0, CPUs=4, heap=1.8GB])
> And then some absolutely randomly chosen (deleted from cache to this moment) events came to other working node with status CREATED (Remind you that we deleted them from cache to this moment). In our case it was 5 events.
> I think this is direct violation of Continuous Query's "exactly once delivery" contract. 
> Source code is here: [https://github.com/ruslangm/ignite-sample]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)