You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by akorensh <al...@gmail.com> on 2020/05/07 20:30:51 UTC

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Hi,
  It shouldn't work like you described:
  Continuous query clients should only be receiving the records that are
pertinent to their filters/transformers.

  verify the continuous query definitions using the appropriate view:
https://apacheignite.readme.io/docs/continuous_queries

  take a look at : https://apacheignite.readme.io/docs/continuous-queries

  and the example here: 
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java

  Send a reproducer (And the version you are on) and I'll take a look.

Thanks, Alex


  



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
Hello, 

Looks like this issue is not related to load, but related to remote filter
not getting redeployed when a server node is restarted on version 2.7.6. 

Related discussion at 
http://apache-ignite-users.70518.x6.nabble.com/Remote-Filter-Execution-td33274.html

regards,
Veena.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by akorensh <al...@gmail.com>.
Hi,
   handleEntry() performs internal housekeeping chores.
  
   You are correct a notification is sent to localListeners whether an entry
was filtered or not.
   your actual local listener will be called only if the entry has passed
your filter criteria.

  If you place a breakpoint inside
CacheContinuousQueryHandler#notifyLocalListener you will see that 
  this message gets rejected by the caller.
  here:    if (F.isEmpty(evts))     return;



   In general you shouldn't need to go so far deeply into the internals:

Try the following:

server:
        IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);


        int i = 0;
        while (true) {
            cache.put(i++, Integer.toString(i));
            System.out.println("added entry: " + i);
           Thread.sleep(100);
        }


client:
        IgniteCache<Integer, String> cache =
ignite.getOrCreateCache(CACHE_NAME);

                    ContinuousQuery < Integer, String > qry = new
ContinuousQuery<>();
                    qry.setLocalListener((evts) -> evts.forEach(e ->
System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));
                   
qry.setRemoteFilterFactory((Factory<CacheEntryEventFilter&lt;Integer,
String>>) () -> (CacheEntryEventFilter<Integer, String>) e -> e.getKey() % 2
== 0);
                    cache.query(qry);


  Only the filtered events should arrive.

Thanks, Alex



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
HI  Alex,

I am not able to reproduce this issue - we have seen it happening a few
times in our test env. though. 

I am trying to go through logs and code to understand how it can happen. 


It looks like all the local listeners for this cache( about 6 of them ) 
have received the 'untransformed' entry  that should have ideally been not
received when this situation occurred. The local listeners for all the 6 of
them have printed the evt. 

What has been observed for the CQ that has received this event incorrectly:
a. On the server logs, the remote filter says that the event entry has not
passed the filter
b. No logs for the remote transformer are printed.
c. The local listener has received the 'untransformed' entry


On the CacheContinuousQueryHandler , 
    private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt,
        boolean notify, boolean loc, boolean recordIgniteEvt) {

I see these lines : 
1.            if (loc) {
2.                if (!locOnly) {
3.                    Collection<CacheEntryEvent&lt;? extends K, ? extends
V>> evts = handleEvent(ctx, entry);

4.                    notifyLocalListener(evts, trans);

5.                    if (!internal && !skipPrimaryCheck)
6.                       
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
7.                }
8.                else {
9.                    if (!entry.isFiltered())
10.                        notifyLocalListener(F.<CacheEntryEvent&lt;?
extends K, ? extends V>>asList(evt), trans);
11.                }
12.            }
13.            else {
14.                if (!entry.isFiltered()) {
15.                    if (trans != null)
16.                        entry = transformToEntry(trans, evt);

17.                    prepareEntry(cctx, nodeId, entry);
18.                }

19.                Object entryOrList = handleEntry(cctx, entry);

20.                if (entryOrList != null) {
21.                    if (log.isDebugEnabled())
22.                        log.debug("Send the following event to listener:
" + entryOrList);

23.                    ctx.continuous().addNotification(nodeId, routineId,
entryOrList, topic, sync, true);
24.                }
            }


If entry.isFiltered() returns false as per line 14 above, the remote
transformer for the entry is not executed. However the entry is processed on 
line 19 .
 What does handleEntry do ?

I noticed that every update notification is sent to all locallisteners
regardless of whether the entry is filtered or not . 

On the locallistener, if entry.isFiltered() returns false, the entry is
picked up and the local listener is called .

Kindly confirm my understanding .

regards,
Veena





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by akorensh <al...@gmail.com>.
It might, I would need to see a reproducer to make a determination.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
I see this line being printed just before any local listener is invoked :
2020-05-06T16:28:17,909 INFO  o.a.i.s.c.t.TcpCommunicationSpi
[grid-nio-worker-tcp-comm-4-#255%ActivDataPublisher-ACTIVEI2-igniteclient-GREEN%]:
Accepted incoming communication connection [locAddr=/x.x.x.x:yyy,
rmtAddr=/x.x.x.x:yyy]

The remote address is the address of the ignite server node.


The question is, should a client whose remote filter should filter out an
update , get this line at all ?




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Event for an update that should have been filtered is received in Local Listener of Continuous Query when a 1000 row insert is triggered

Posted by VeenaMithare <v....@cmcmarkets.com>.
Hi Alex, 

Thank you for the reply . 
>>  verify the continuous query definitions using the appropriate view:
https://apacheignite.readme.io/docs/continuous_queries

We are on 2.7.6 version. I guess this view is not available for us. Also we
have been running the CQs for couple of months now in our test env. and have
not faced any issues.
 
This issue is more recent and happens sometimes ( cannot figure out what
could have caused it yet. ) . Though the issue has happened couple of times
on our test env. - I am not able to reproduce this on my local machine. Also
`i was able to cause this failure only once on the linux test env ( never so
far on my windows machine even though I have tested many scenarios so far. )
It looks like some exceptional scenario or some race condition has caused
this. 

Please note that we have recently put a EVT_NODE_SEGMENTED HANDLER and we
have a handler to a cluster switch request  where switch to a different
cluster based on updates on a particular record on a particular table- the
handling of both events is to do ignite.close() and Ignition.start()( with
the right cluster config ). 

As mentioned, we have tested the event handler and huge inserts/updates
after segmentation etc. and I am not able to cause this issue to occur.  

regards,
Veena



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/