You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Mengyu Jing (Jira)" <ji...@apache.org> on 2023/05/25 01:47:00 UTC

[jira] [Created] (IGNITE-19561) Ignite thin client continuous query listener cannot listen to all events

Mengyu Jing created IGNITE-19561:
------------------------------------

             Summary: Ignite thin client continuous query listener cannot listen to all events
                 Key: IGNITE-19561
                 URL: https://issues.apache.org/jira/browse/IGNITE-19561
             Project: Ignite
          Issue Type: Bug
          Components: cache, clients
    Affects Versions: 2.15
         Environment: JDK 1.8 
            Reporter: Mengyu Jing
         Attachments: result1.log, result2.log

*Problem scenario:*

Start the Ignite server of one node, start one thin client and create a continuous query listener, and then use 50 threads to add 500 data to the cache concurrently.

*Problem phenomenon:*

Through the information printed on the listener, it was found that the number of events listened to each time varies, possibly 496, 498, 499 or 500...

*Test Code:*
{code:java}
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

import java.util.ArrayList;
import java.util.List;

public class StartServer {
    public static void main(String[] args) {
        IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        List<String> addrList = new ArrayList<>();
        addrList.add("127.0.0.1:47500");
        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
        ipFinder.setAddresses(addrList);
        spi.setIpFinder(ipFinder);
        igniteConfiguration.setDiscoverySpi(spi);
        Ignite ignite = Ignition.start(igniteConfiguration);
    }
}
{code}
{code:java}
package com.example.continuebug;

import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import java.util.Iterator;

public class StartThinClient {
    public static void main(String[] args) throws InterruptedException {
        String addr = "127.0.0.1:10800";

        int threadNmu = 50;

        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setAddresses(addr);

        IgniteClient client1 = Ignition.startClient(clientConfiguration);

        ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test");

        ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
        query.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent<?, ?>> cacheEntryEvents) throws CacheEntryListenerException {
                Iterator<CacheEntryEvent<?, ?>> iterator = cacheEntryEvents.iterator();
                while (iterator.hasNext()) {
                    CacheEntryEvent<?, ?> next = iterator.next();
                    System.out.println("----" + next.getKey());
                }
            }
        });

        cache1.query(query);

        IgniteClient client2 = Ignition.startClient(clientConfiguration);
        ClientCache<Object, Object> cache2 = client2.cache("test");

        Thread[] threads = new Thread[threadNmu];
        for (int i = 0; i < threads.length; ++i) {
            threads[i] = new Thread(new OperationInsert(cache2, i, 500, threadNmu));
        }
        for (int i = 0; i < threads.length; ++i) {
            threads[i].start();
        }
        for (Thread thread : threads) {
            thread.join();
        }

        Thread.sleep(60000);

    }

    static class OperationInsert implements Runnable {

        private ClientCache<Object, Object> cache;
        private int k;
        private Integer test_rows;
        private Integer thread_cnt;

        public OperationInsert(ClientCache<Object, Object> cache, int k, Integer test_rows, Integer thread_cnt) {
            this.cache = cache;
            this.k = k;
            this.test_rows = test_rows;
            this.thread_cnt = thread_cnt;
        }

        @Override
        public void run() {
            for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + (test_rows/thread_cnt) * (k + 1); i++) {
                cache.put("" + i, "aaa");
            }
        }
    }

} {code}
*Running results:*

*[^result1.log][^result2.log]*

*Version:*

The testing program uses Ignite version 2.15.0.

I attempted to insert data using one thread and did not observe any event loss. In addition, I also attempted an Ignite cluster with two or three nodes, which can still listen to all 500 events even when inserting data using multiple threads.This problem seems to only occur when concurrent threads insert data into a node.{*}{*}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)