You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by VeenaMithare <v....@cmcmarkets.com> on 2020/05/07 08:27:55 UTC

Incorrect Value Received in Local Listener of Continuous Query when a 1000 row insert is triggered

We have about 5 clients. Each client starts 2 different CQ to the same cache
abc . (We need 2 CQ since we want different action and different lifecycle
when the local listener is triggered for either query. )

Client 1 : 
Say CQ1 is listening to changes to a record where name is name1. (There is a
remote filter that filters the any update on abc and returns true if the
name is name1. There is also a remote filter that returns only 2 fields out
of the 100 fields of cache abc.)
and CQ2 is listening to changes to a record where name is name2. (There is a
remote filter that filters the any update on abc and returns true if the
name is name2. There is also a remote filter that returns only 2 fields out
of the 100 fields of cache abc.)

Client 2
CQ3 is listening to changes to a record where name is name1 ( Similar to CQ1
)
CQ4 is listening to all updates on this cache abc. The remote filter passes
all records on this cache. The remote transformer returns all 100 fields of
the cache abc.

What we have observed is as below ;
1. Insert about 1000 rows  in to cache abc
2. The local listener for client1 - CQ1 - receives updates meant for Client2
- CQ4. 

Could you kindly guide on what I am doing incorrectly. 

Please note, that if I update only one record, the local listeners are
invoked correctly for each query.

regards,
Veena



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
Hello, 

Looks like this issue is not related to load, but related to remote filter
not getting redeployed when a server node is restarted on version 2.7.6. 

Related discussion at 
http://apache-ignite-users.70518.x6.nabble.com/Remote-Filter-Execution-td33274.html

regards,
Veena.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by akorensh <al...@gmail.com>.
Hi,
   handleEntry() performs internal housekeeping chores.
  
   You are correct a notification is sent to localListeners whether an entry
was filtered or not.
   your actual local listener will be called only if the entry has passed
your filter criteria.

  If you place a breakpoint inside
CacheContinuousQueryHandler#notifyLocalListener you will see that 
  this message gets rejected by the caller.
  here:    if (F.isEmpty(evts))     return;



   In general you shouldn't need to go so far deeply into the internals:

Try the following:

server:
        IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);


        int i = 0;
        while (true) {
            cache.put(i++, Integer.toString(i));
            System.out.println("added entry: " + i);
           Thread.sleep(100);
        }


client:
        IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);

                    ContinuousQuery < Integer, String > qry = new
ContinuousQuery<>();
                    qry.setLocalListener((evts) -> evts.forEach(e ->
System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));
                   
qry.setRemoteFilterFactory((Factory<CacheEntryEventFilter&lt;Integer,
String>>) () -> (CacheEntryEventFilter<Integer, String>) e -> e.getKey() % 2
== 0);
                    cache.query(qry);


  Only the filtered events should arrive.

Thanks, Alex



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
HI  Alex,

I am not able to reproduce this issue - we have seen it happening a few
times in our test env. though. 

I am trying to go through logs and code to understand how it can happen. 


It looks like all the local listeners for this cache( about 6 of them ) 
have received the 'untransformed' entry  that should have ideally been not
received when this situation occurred. The local listeners for all the 6 of
them have printed the evt. 

What has been observed for the CQ that has received this event incorrectly:
a. On the server logs, the remote filter says that the event entry has not
passed the filter
b. No logs for the remote transformer are printed.
c. The local listener has received the 'untransformed' entry


On the CacheContinuousQueryHandler , 
    private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt,
        boolean notify, boolean loc, boolean recordIgniteEvt) {

I see these lines : 
1.            if (loc) {
2.                if (!locOnly) {
3.                    Collection<CacheEntryEvent&lt;? extends K, ? extends
V>> evts = handleEvent(ctx, entry);

4.                    notifyLocalListener(evts, trans);

5.                    if (!internal && !skipPrimaryCheck)
6.                       
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
7.                }
8.                else {
9.                    if (!entry.isFiltered())
10.                        notifyLocalListener(F.<CacheEntryEvent&lt;?
extends K, ? extends V>>asList(evt), trans);
11.                }
12.            }
13.            else {
14.                if (!entry.isFiltered()) {
15.                    if (trans != null)
16.                        entry = transformToEntry(trans, evt);

17.                    prepareEntry(cctx, nodeId, entry);
18.                }

19.                Object entryOrList = handleEntry(cctx, entry);

20.                if (entryOrList != null) {
21.                    if (log.isDebugEnabled())
22.                        log.debug("Send the following event to listener:
" + entryOrList);

23.                    ctx.continuous().addNotification(nodeId, routineId,
entryOrList, topic, sync, true);
24.                }
            }


If entry.isFiltered() returns false as per line 14 above, the remote
transformer for the entry is not executed. However the entry is processed on 
line 19 .
 What does handleEntry do ?

I noticed that every update notification is sent to all locallisteners
regardless of whether the entry is filtered or not . 

On the locallistener, if entry.isFiltered() returns false, the entry is
picked up and the local listener is called .

Kindly confirm my understanding .

regards,
Veena





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by akorensh <al...@gmail.com>.
It might, I would need to see a reproducer to make a determination.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
I see this line being printed just before any local listener is invoked :
2020-05-06T16:28:17,909 INFO  o.a.i.s.c.t.TcpCommunicationSpi
[grid-nio-worker-tcp-comm-4-#255%ActivDataPublisher-ACTIVEI2-igniteclient-GREEN%]:
Accepted incoming communication connection [locAddr=/x.x.x.x:yyy,
rmtAddr=/x.x.x.x:yyy]

The remote address is the address of the ignite server node.


The question is, should a client whose remote filter should filter out an
update , get this line at all ?




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
Hi Alex, 

Thank you for the reply . 
>>  verify the continuous query definitions using the appropriate view:
https://apacheignite.readme.io/docs/continuous_queries

We are on 2.7.6 version. I guess this view is not available for us. Also we
have been running the CQs for couple of months now in our test env. and have
not faced any issues.
 
This issue is more recent and happens sometimes ( cannot figure out what
could have caused it yet. ) . Though the issue has happened couple of times
on our test env. - I am not able to reproduce this on my local machine. Also
`i was able to cause this failure only once on the linux test env ( never so
far on my windows machine even though I have tested many scenarios so far. )
It looks like some exceptional scenario or some race condition has caused
this. 

Please note that we have recently put a EVT_NODE_SEGMENTED HANDLER and we
have a handler to a cluster switch request  where switch to a different
cluster based on updates on a particular record on a particular table- the
handling of both events is to do ignite.close() and Ignition.start()( with
the right cluster config ). 

As mentioned, we have tested the event handler and huge inserts/updates
after segmentation etc. and I am not able to cause this issue to occur.  

regards,
Veena



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by akorensh <al...@gmail.com>.
Hi,
  It shouldn't work like you described:
  Continuous query clients should only be receiving the records that are
pertinent to their filters/transformers.

  verify the continuous query definitions using the appropriate view:
https://apacheignite.readme.io/docs/continuous_queries

  take a look at : https://apacheignite.readme.io/docs/continuous-queries

  and the example here: 
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java

  Send a reproducer (And the version you are on) and I'll take a look.

Thanks, Alex


  



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/