You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Sergey Kosarev (Jira)" <ji...@apache.org> on 2023/03/07 09:18:00 UTC

[jira] [Updated] (IGNITE-18976) Affinity broken on the client after reconnection

     [ https://issues.apache.org/jira/browse/IGNITE-18976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sergey Kosarev updated IGNITE-18976:
------------------------------------
    Attachment: IgniteClientReconnectAffinityTest.java

> Affinity broken on the  client after reconnection
> -------------------------------------------------
>
>                 Key: IGNITE-18976
>                 URL: https://issues.apache.org/jira/browse/IGNITE-18976
>             Project: Ignite
>          Issue Type: Bug
>          Components: binary
>    Affects Versions: 2.16
>            Reporter: Sergey Kosarev
>            Priority: Major
>         Attachments: IgniteClientReconnectAffinityTest.java
>
>
> /*
>  * Copyright 2019 GridGain Systems, Inc. and Contributors.
>  *
>  * Licensed under the GridGain Community Edition License (the "License");
>  * you may not use this file except in compliance with the License.
>  * You may obtain a copy of the License at
>  *
>  *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.ignite.internal;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteException;
> import org.apache.ignite.IgniteLogger;
> import org.apache.ignite.binary.BinaryTypeConfiguration;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheKeyConfiguration;
> import org.apache.ignite.cache.affinity.AffinityKeyMapped;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.configuration.BinaryConfiguration;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.managers.communication.GridIoMessage;
> import org.apache.ignite.internal.util.typedef.F;
> import org.apache.ignite.internal.util.typedef.T2;
> import org.apache.ignite.lang.IgniteInClosure;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.resources.LoggerResource;
> import org.apache.ignite.spi.IgniteSpiException;
> import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.junit.Test;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.UUID;
> /**
>  *
>  */
> public class IgniteClientReconnectAffinityTest extends IgniteClientReconnectAbstractTest {
>     /** */
>     private static final int SRV_CNT = 1;
>     /** */
>     private UUID nodeId;
>     private Ignite client;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
>         TestCommunicationSpi commSpi = new TestCommunicationSpi();
>         commSpi.setSharedMemoryPort(-1);
>         cfg.setCommunicationSpi(commSpi);
>         cfg.setPeerClassLoadingEnabled(false);
>         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
>         cfg.setCacheKeyConfiguration(new CacheKeyConfiguration(TestNotAnnotatedKey.class.getName(), TestNotAnnotatedKey.AFFINITY_KEY_FIELD))
>             .setBinaryConfiguration(
>                     new BinaryConfiguration()
>                             .setTypeConfigurations(Arrays.asList(
>                                     new BinaryTypeConfiguration()
>                                             .setTypeName(TestNotAnnotatedKey.class.getName()),
>                                     new BinaryTypeConfiguration()
>                                             .setTypeName(TestAnnotatedKey.class.getName())
>                             ))
>             )
>           ;
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected int serverCount() {
>         return 0;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTest() throws Exception {
>         startGrids(SRV_CNT);
>     }
>     /** {@inheritDoc} */
>     @Override protected void afterTest() throws Exception {
>         stopAllGrids();
>     }
>     @Test
>     public void testReconnectClientNotAnnotatedAffinityKeyGet() throws Exception {
>         clientMode = true;
>         final Ignite client = startGrid(SRV_CNT);
>         assertTrue(client.cluster().localNode().isClient());
>         final Ignite srv = clientRouter(client);
>         final IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
>                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>             );
>         final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
>         assertNotNull(srvCache);
>         final String val = "val";
>         clientCache.put(TestNotAnnotatedKey.of(1), val);
>         assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
>         assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
>         reconnectClientNode(client, srv, new Runnable() {
>             @Override public void run() {
>                 assertNotNull(srvCache.get(TestNotAnnotatedKey.of(1)));
>             }
>         });
>         assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
>         assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
>     }
>     @Test
>     public void testReconnectClientNotAnnotatedAffinityKeyPartition() throws Exception {
>         clientMode = true;
>         client = startGrid(SRV_CNT);
>         assertTrue(client.cluster().localNode().isClient());
>         final Ignite srv = clientRouter(client);
>         IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> //                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>         );
>         final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
>         assertNotNull(srvCache);
>         int partition = partition(TestNotAnnotatedKey.of(1), client);
>         assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
>         reconnectClientNode(client, srv, new Runnable() {
>             @Override public void run() {
>                 assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
>             }
>         });
>         assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
>         assertEquals(partition, partition(TestNotAnnotatedKey.of(1), client));
>     }
>     @Test
>     public void testReconnectClientAnnotatedAffinityKeyGet() throws Exception {
>         clientMode = true;
>         final Ignite client = startGrid(SRV_CNT);
>         assertTrue(client.cluster().localNode().isClient());
>         final Ignite srv = clientRouter(client);
>         final IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
>                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>         );
>         final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
>         assertNotNull(srvCache);
>         final String val = "val";
>         clientCache.put(TestAnnotatedKey.of(1), val);
>         assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
>         assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
>         reconnectClientNode(client, srv, new Runnable() {
>             @Override public void run() {
>                 assertNotNull(srvCache.get(TestAnnotatedKey.of(1)));
>             }
>         });
>         assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
>         assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
>     }
>     @Test
>     public void testReconnectClientAnnotatedAffinityKeyPartition() throws Exception {
>         clientMode = true;
>         client = startGrid(SRV_CNT);
>         assertTrue(client.cluster().localNode().isClient());
>         final Ignite srv = clientRouter(client);
>         IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> //                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>         );
>         final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
>         assertNotNull(srvCache);
>         int partition = partition(TestAnnotatedKey.of(1), client);
>         assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
>         reconnectClientNode(client, srv, new Runnable() {
>             @Override public void run() {
>                 assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
>             }
>         });
>         assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
>         assertEquals(partition, partition(TestAnnotatedKey.of(1), client));
>     }
>     private <K> int partition(K key, Ignite ign) {
>         return ign.affinity(DEFAULT_CACHE_NAME).partition(key);
>     }
>     static class TestNotAnnotatedKey {
>         private static final String AFFINITY_KEY_FIELD = "affinityKey";
>         private int notAnnotatedAffinityKey;
>         private int nonAffinityInfo;
>         public TestNotAnnotatedKey(int affinityKey, int nonAffinityInfo) {
>             this.notAnnotatedAffinityKey = affinityKey;
>             this.nonAffinityInfo = nonAffinityInfo;
>         }
>         public static TestNotAnnotatedKey of(int affinityKey) {
>             return new TestNotAnnotatedKey(affinityKey, affinityKey);
>         }
>     }
>     static class TestAnnotatedKey {
>         @AffinityKeyMapped
>         private int annotatedAffinityKey;
>         private int nonAffinityInfo;
>         public TestAnnotatedKey(int affinityKey, int nonAffinityInfo) {
>             this.annotatedAffinityKey = affinityKey;
>             this.nonAffinityInfo = nonAffinityInfo;
>         }
>         public static TestAnnotatedKey of(int affinityKey) {
>             return new TestAnnotatedKey(affinityKey, affinityKey);
>         }
>     }
>     /**
>      *
>      */
>     private static class TestCommunicationSpi extends TcpCommunicationSpi {
>         /** */
>         @LoggerResource
>         private IgniteLogger log;
>         /** */
>         private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
>         /** */
>         private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
>         /** {@inheritDoc} */
>         @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
>             throws IgniteSpiException {
>             if (msg instanceof GridIoMessage) {
>                 Object msg0 = ((GridIoMessage)msg).message();
>                 synchronized (this) {
>                     Set<UUID> blockNodes = blockCls.get(msg0.getClass());
>                     if (F.contains(blockNodes, node.id())) {
>                         log.info("Block message [node=" +
>                             node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg0 + ']');
>                         blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
>                         return;
>                     }
>                 }
>             }
>             super.sendMessage(node, msg, ackClosure);
>         }
>         /**
>          * @param cls Message class.
>          * @param nodeId Node ID.
>          */
>         void blockMessages(Class<?> cls, UUID nodeId) {
>             synchronized (this) {
>                 Set<UUID> set = blockCls.get(cls);
>                 if (set == null) {
>                     set = new HashSet<>();
>                     blockCls.put(cls, set);
>                 }
>                 set.add(nodeId);
>             }
>         }
>         /**
>          * @param snd Send messages flag.
>          */
>         void stopBlock(boolean snd) {
>             synchronized (this) {
>                 blockCls.clear();
>                 if (snd) {
>                     for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
>                         ClusterNode node = msg.get1();
>                         log.info("Send blocked message: [node=" +
>                             node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
>                             ", msg=" + msg.get2().message() + ']');
>                         super.sendMessage(msg.get1(), msg.get2());
>                     }
>                 }
>                 blockedMsgs.clear();
>             }
>         }
>     }
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)