You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Mengyu Jing (Jira)" <ji...@apache.org> on 2023/05/25 01:51:00 UTC

[jira] [Updated] (IGNITE-19561) Ignite thin client continuous query listener cannot listen to all events

     [ https://issues.apache.org/jira/browse/IGNITE-19561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mengyu Jing updated IGNITE-19561:
---------------------------------
    External issue URL: https://stackoverflow.com/questions/76216469/ignite-thin-client-continuous-query-listener-cannot-listen-to-all-events
           Environment: 
JDK 1.8 

Windows 10

  was:JDK 1.8 


> Ignite thin client continuous query listener cannot listen to all events
> ------------------------------------------------------------------------
>
>                 Key: IGNITE-19561
>                 URL: https://issues.apache.org/jira/browse/IGNITE-19561
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache, clients
>    Affects Versions: 2.15
>         Environment: JDK 1.8 
> Windows 10
>            Reporter: Mengyu Jing
>            Priority: Major
>         Attachments: result1.log, result2.log
>
>
> *Problem scenario:*
> Start the Ignite server of one node, start one thin client and create a continuous query listener, and then use 50 threads to add 500 data to the cache concurrently.
> *Problem phenomenon:*
> Through the information printed on the listener, it was found that the number of events listened to each time varies, possibly 496, 498, 499 or 500...
> *Test Code:*
> {code:java}
> import org.apache.ignite.Ignite;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
> import java.util.ArrayList;
> import java.util.List;
> public class StartServer {
>     public static void main(String[] args) {
>         IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
>         TcpDiscoverySpi spi = new TcpDiscoverySpi();
>         List<String> addrList = new ArrayList<>();
>         addrList.add("127.0.0.1:47500");
>         TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>         ipFinder.setAddresses(addrList);
>         spi.setIpFinder(ipFinder);
>         igniteConfiguration.setDiscoverySpi(spi);
>         Ignite ignite = Ignition.start(igniteConfiguration);
>     }
> }
> {code}
> {code:java}
> package com.example.continuebug;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.cache.query.ContinuousQuery;
> import org.apache.ignite.client.ClientCache;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.ClientConfiguration;
> import javax.cache.event.CacheEntryEvent;
> import javax.cache.event.CacheEntryListenerException;
> import javax.cache.event.CacheEntryUpdatedListener;
> import java.util.Iterator;
> public class StartThinClient {
>     public static void main(String[] args) throws InterruptedException {
>         String addr = "127.0.0.1:10800";
>         int threadNmu = 50;
>         ClientConfiguration clientConfiguration = new ClientConfiguration();
>         clientConfiguration.setAddresses(addr);
>         IgniteClient client1 = Ignition.startClient(clientConfiguration);
>         ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test");
>         ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
>         query.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
>             @Override
>             public void onUpdated(Iterable<CacheEntryEvent<?, ?>> cacheEntryEvents) throws CacheEntryListenerException {
>                 Iterator<CacheEntryEvent<?, ?>> iterator = cacheEntryEvents.iterator();
>                 while (iterator.hasNext()) {
>                     CacheEntryEvent<?, ?> next = iterator.next();
>                     System.out.println("----" + next.getKey());
>                 }
>             }
>         });
>         cache1.query(query);
>         IgniteClient client2 = Ignition.startClient(clientConfiguration);
>         ClientCache<Object, Object> cache2 = client2.cache("test");
>         Thread[] threads = new Thread[threadNmu];
>         for (int i = 0; i < threads.length; ++i) {
>             threads[i] = new Thread(new OperationInsert(cache2, i, 500, threadNmu));
>         }
>         for (int i = 0; i < threads.length; ++i) {
>             threads[i].start();
>         }
>         for (Thread thread : threads) {
>             thread.join();
>         }
>         Thread.sleep(60000);
>     }
>     static class OperationInsert implements Runnable {
>         private ClientCache<Object, Object> cache;
>         private int k;
>         private Integer test_rows;
>         private Integer thread_cnt;
>         public OperationInsert(ClientCache<Object, Object> cache, int k, Integer test_rows, Integer thread_cnt) {
>             this.cache = cache;
>             this.k = k;
>             this.test_rows = test_rows;
>             this.thread_cnt = thread_cnt;
>         }
>         @Override
>         public void run() {
>             for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + (test_rows/thread_cnt) * (k + 1); i++) {
>                 cache.put("" + i, "aaa");
>             }
>         }
>     }
> } {code}
> *Running results:*
> *[^result1.log][^result2.log]*
> *Version:*
> The testing program uses Ignite version 2.15.0.
> I attempted to insert data using one thread and did not observe any event loss. In addition, I also attempted an Ignite cluster with two or three nodes, which can still listen to all 500 events even when inserting data using multiple threads.This problem seems to only occur when concurrent threads insert data into a node.{*}{*}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)