You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Maxim Muzafarov (Jira)" <ji...@apache.org> on 2020/05/30 15:33:00 UTC

[jira] [Updated] (IGNITE-7868) Continuous Query LocalListener on backup node works unstable

     [ https://issues.apache.org/jira/browse/IGNITE-7868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maxim Muzafarov updated IGNITE-7868:
------------------------------------
    Labels: cq_todo  (was: )

> Continuous Query LocalListener on backup node works unstable
> ------------------------------------------------------------
>
>                 Key: IGNITE-7868
>                 URL: https://issues.apache.org/jira/browse/IGNITE-7868
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.3
>            Reporter: Dmitry Uskov
>            Priority: Major
>              Labels: cq_todo
>
>  I have two nodes (*node1* and *node2*) with configuration:
> {code:java}
> Ignition.start();
> ProgramContext.ignite = Ignition.ignite(); //ProgramContext.ignite is public static field
> String cacheName = "A-Cache";
> CacheConfiguration<CacheKey, CacheValue> cacheConfiguration = new CacheConfiguration<>();
> cacheConfiguration.setName(cacheName);
> cacheConfiguration.setAtomicityMode(CacheAtomicityMode.ATOMIC);
> cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
> cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
> cacheConfiguration.setBackups(1);
> IgniteCache<CacheKey, CacheValue> cache = ProgramContext.ignite.getOrCreateCache(cacheConfiguration);
> ContinuousQuery<CacheKey, CacheValue> query = new ContinuousQuery<>();
> ClusterNode node = ProgramContext.ignite.cluster().localNode();
> query.setRemoteFilterFactory((Factory<CacheEntryEventFilter<CacheKey, CacheValue>>) () -> (CacheEntryEventFilter<CacheKey, CacheValue>) event -> {
>  boolean currentNodeIsPrimary = node.equals(ProgramContext.ignite.cluster().localNode());
>  boolean eventTypeIsCreated = event.getEventType().equals(javax.cache.event.EventType.CREATED);
>  return currentNodeIsPrimary && eventTypeIsCreated;
> });
> query.setLocalListener(cacheEntryEvents -> {
>  for (CacheEntryEvent<? extends CacheKey, ? extends CacheValue> cacheEntryEvent : cacheEntryEvents) {
>  System.out.println("CacheEntryUpdatedListener: " + cacheEntryEvent.getKey() + "/" + cacheEntryEvent.getValue());
>  }
> });
> cache.query(query);
> {code}
> CacheKey:
> {code:java}
> public class CacheKey {
>  public static final String DEFAULT_AFFINITY_KEY = "cons";
>  private static String affinityKeyValue = DEFAULT_AFFINITY_KEY;
>  private String key;
>  @AffinityKeyMapped
>  private String affinityKey;
>  public CacheKey(String key) {
>  this.key = key;
>  this.affinityKey = affinityKeyValue; //always DEFAULT_AFFINITY_KEY 
>  }
>  //... getters/setters and toString()
> }{code}
> CacheValue:
> {code:java}
> public class CacheValue {
>  private String value;
>  public CacheValue(String value) {
>  this.value = value;
>  }
>  //... getters/setters and toString()
>  
> }{code}
> I put two values in cache. As affinityKey always equals DAFAULT_AFFINITY_KEY one node (suppose *node1*)  is primary for both values. LocalListener has worked twice on *node1*.
> Console output:
> {noformat}
> CacheEntryUpdatedListener: CacheKey{key='node1-0', affinityKey='cons'}/CacheValue{value='node1-0'}
> CacheEntryUpdatedListener: CacheKey{key='node1-1', affinityKey='cons'}/CacheValue{value='node1-1'}
> {noformat}
>  
> Then I stoped it and I saw LocalListener on *node2* has worked once, but I expected LocalListener on *node2* had to work twice.
> Console out node1:
> {noformat}
> [12:36:36] Ignite node stopped OK [uptime=00:00:14.243]{noformat}
> Console out node2:
> {noformat}
> [12:36:36] Topology snapshot [ver=3, servers=1, clients=0, CPUs=4, heap=1.6GB]
> CacheEntryUpdatedListener: CacheKey{key='node1-1', affinityKey='cons'}/CacheValue{value='node1-1'}{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)