You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Sergey Kosarev (Jira)" <ji...@apache.org> on 2023/03/07 08:12:00 UTC

[jira] [Created] (IGNITE-18976) Affinity broken on the client after reconnection

Sergey Kosarev created IGNITE-18976:
---------------------------------------

             Summary: Affinity broken on the  client after reconnection
                 Key: IGNITE-18976
                 URL: https://issues.apache.org/jira/browse/IGNITE-18976
             Project: Ignite
          Issue Type: Bug
          Components: binary
    Affects Versions: 2.16
            Reporter: Sergey Kosarev


/*
 * Copyright 2019 GridGain Systems, Inc. and Contributors.
 *
 * Licensed under the GridGain Community Edition License (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

/**
 *
 */
public class IgniteClientReconnectAffinityTest extends IgniteClientReconnectAbstractTest {
    /** */
    private static final int SRV_CNT = 1;

    /** */
    private UUID nodeId;
    private Ignite client;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        TestCommunicationSpi commSpi = new TestCommunicationSpi();

        commSpi.setSharedMemoryPort(-1);

        cfg.setCommunicationSpi(commSpi);

        cfg.setPeerClassLoadingEnabled(false);

        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);

        cfg.setCacheKeyConfiguration(new CacheKeyConfiguration(TestNotAnnotatedKey.class.getName(), TestNotAnnotatedKey.AFFINITY_KEY_FIELD))
            .setBinaryConfiguration(
                    new BinaryConfiguration()
                            .setTypeConfigurations(Arrays.asList(
                                    new BinaryTypeConfiguration()
                                            .setTypeName(TestNotAnnotatedKey.class.getName()),
                                    new BinaryTypeConfiguration()
                                            .setTypeName(TestAnnotatedKey.class.getName())
                            ))
            )
          ;

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected int serverCount() {
        return 0;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        startGrids(SRV_CNT);
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();
    }

    @Test
    public void testReconnectClientNotAnnotatedAffinityKeyGet() throws Exception {
        clientMode = true;

        final Ignite client = startGrid(SRV_CNT);

        assertTrue(client.cluster().localNode().isClient());

        final Ignite srv = clientRouter(client);

        final IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );

        final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);

        assertNotNull(srvCache);

        final String val = "val";

        clientCache.put(TestNotAnnotatedKey.of(1), val);

        assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));

        assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));

        reconnectClientNode(client, srv, new Runnable() {
            @Override public void run() {
                assertNotNull(srvCache.get(TestNotAnnotatedKey.of(1)));
            }
        });

        assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));

        assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
    }

    @Test
    public void testReconnectClientNotAnnotatedAffinityKeyPartition() throws Exception {
        clientMode = true;

        client = startGrid(SRV_CNT);

        assertTrue(client.cluster().localNode().isClient());

        final Ignite srv = clientRouter(client);

        IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
//                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
        );

        final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);

        assertNotNull(srvCache);

        int partition = partition(TestNotAnnotatedKey.of(1), client);

        assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));

        reconnectClientNode(client, srv, new Runnable() {
            @Override public void run() {
                assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
            }
        });

        assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));

        assertEquals(partition, partition(TestNotAnnotatedKey.of(1), client));
    }

    @Test
    public void testReconnectClientAnnotatedAffinityKeyGet() throws Exception {
        clientMode = true;

        final Ignite client = startGrid(SRV_CNT);

        assertTrue(client.cluster().localNode().isClient());

        final Ignite srv = clientRouter(client);

        final IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
        );

        final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);

        assertNotNull(srvCache);

        final String val = "val";

        clientCache.put(TestAnnotatedKey.of(1), val);

        assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));

        assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));

        reconnectClientNode(client, srv, new Runnable() {
            @Override public void run() {
                assertNotNull(srvCache.get(TestAnnotatedKey.of(1)));
            }
        });

        assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));

        assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
    }

    @Test
    public void testReconnectClientAnnotatedAffinityKeyPartition() throws Exception {
        clientMode = true;

        client = startGrid(SRV_CNT);

        assertTrue(client.cluster().localNode().isClient());

        final Ignite srv = clientRouter(client);

        IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
//                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
        );

        final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);

        assertNotNull(srvCache);

        int partition = partition(TestAnnotatedKey.of(1), client);

        assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));

        reconnectClientNode(client, srv, new Runnable() {
            @Override public void run() {
                assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
            }
        });

        assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));

        assertEquals(partition, partition(TestAnnotatedKey.of(1), client));
    }

    private <K> int partition(K key, Ignite ign) {
        return ign.affinity(DEFAULT_CACHE_NAME).partition(key);
    }

    static class TestNotAnnotatedKey {
        private static final String AFFINITY_KEY_FIELD = "affinityKey";
        private int notAnnotatedAffinityKey;
        private int nonAffinityInfo;

        public TestNotAnnotatedKey(int affinityKey, int nonAffinityInfo) {
            this.notAnnotatedAffinityKey = affinityKey;
            this.nonAffinityInfo = nonAffinityInfo;
        }

        public static TestNotAnnotatedKey of(int affinityKey) {
            return new TestNotAnnotatedKey(affinityKey, affinityKey);
        }
    }

    static class TestAnnotatedKey {
        @AffinityKeyMapped
        private int annotatedAffinityKey;
        private int nonAffinityInfo;

        public TestAnnotatedKey(int affinityKey, int nonAffinityInfo) {
            this.annotatedAffinityKey = affinityKey;
            this.nonAffinityInfo = nonAffinityInfo;
        }

        public static TestAnnotatedKey of(int affinityKey) {
            return new TestAnnotatedKey(affinityKey, affinityKey);
        }
    }

    /**
     *
     */
    private static class TestCommunicationSpi extends TcpCommunicationSpi {
        /** */
        @LoggerResource
        private IgniteLogger log;

        /** */
        private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();

        /** */
        private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();

        /** {@inheritDoc} */
        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
            throws IgniteSpiException {
            if (msg instanceof GridIoMessage) {
                Object msg0 = ((GridIoMessage)msg).message();

                synchronized (this) {
                    Set<UUID> blockNodes = blockCls.get(msg0.getClass());

                    if (F.contains(blockNodes, node.id())) {
                        log.info("Block message [node=" +
                            node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg0 + ']');

                        blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));

                        return;
                    }
                }
            }

            super.sendMessage(node, msg, ackClosure);
        }

        /**
         * @param cls Message class.
         * @param nodeId Node ID.
         */
        void blockMessages(Class<?> cls, UUID nodeId) {
            synchronized (this) {
                Set<UUID> set = blockCls.get(cls);

                if (set == null) {
                    set = new HashSet<>();

                    blockCls.put(cls, set);
                }

                set.add(nodeId);
            }
        }

        /**
         * @param snd Send messages flag.
         */
        void stopBlock(boolean snd) {
            synchronized (this) {
                blockCls.clear();

                if (snd) {
                    for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
                        ClusterNode node = msg.get1();

                        log.info("Send blocked message: [node=" +
                            node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
                            ", msg=" + msg.get2().message() + ']');

                        super.sendMessage(msg.get1(), msg.get2());
                    }
                }

                blockedMsgs.clear();
            }
        }
    }
}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)