You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Sergey Kosarev (Jira)" <ji...@apache.org> on 2023/03/07 09:18:00 UTC
[jira] [Updated] (IGNITE-18976) Affinity broken on the client after reconnection
[ https://issues.apache.org/jira/browse/IGNITE-18976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergey Kosarev updated IGNITE-18976:
------------------------------------
Attachment: IgniteClientReconnectAffinityTest.java
> Affinity broken on the client after reconnection
> -------------------------------------------------
>
> Key: IGNITE-18976
> URL: https://issues.apache.org/jira/browse/IGNITE-18976
> Project: Ignite
> Issue Type: Bug
> Components: binary
> Affects Versions: 2.16
> Reporter: Sergey Kosarev
> Priority: Major
> Attachments: IgniteClientReconnectAffinityTest.java
>
>
> /*
> * Copyright 2019 GridGain Systems, Inc. and Contributors.
> *
> * Licensed under the GridGain Community Edition License (the "License");
> * you may not use this file except in compliance with the License.
> * You may obtain a copy of the License at
> *
> * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.ignite.internal;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteException;
> import org.apache.ignite.IgniteLogger;
> import org.apache.ignite.binary.BinaryTypeConfiguration;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheKeyConfiguration;
> import org.apache.ignite.cache.affinity.AffinityKeyMapped;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.configuration.BinaryConfiguration;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.internal.managers.communication.GridIoMessage;
> import org.apache.ignite.internal.util.typedef.F;
> import org.apache.ignite.internal.util.typedef.T2;
> import org.apache.ignite.lang.IgniteInClosure;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.resources.LoggerResource;
> import org.apache.ignite.spi.IgniteSpiException;
> import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.junit.Test;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.UUID;
> /**
> *
> */
> public class IgniteClientReconnectAffinityTest extends IgniteClientReconnectAbstractTest {
> /** */
> private static final int SRV_CNT = 1;
> /** */
> private UUID nodeId;
> private Ignite client;
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
> IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
> TestCommunicationSpi commSpi = new TestCommunicationSpi();
> commSpi.setSharedMemoryPort(-1);
> cfg.setCommunicationSpi(commSpi);
> cfg.setPeerClassLoadingEnabled(false);
> ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
> cfg.setCacheKeyConfiguration(new CacheKeyConfiguration(TestNotAnnotatedKey.class.getName(), TestNotAnnotatedKey.AFFINITY_KEY_FIELD))
> .setBinaryConfiguration(
> new BinaryConfiguration()
> .setTypeConfigurations(Arrays.asList(
> new BinaryTypeConfiguration()
> .setTypeName(TestNotAnnotatedKey.class.getName()),
> new BinaryTypeConfiguration()
> .setTypeName(TestAnnotatedKey.class.getName())
> ))
> )
> ;
> return cfg;
> }
> /** {@inheritDoc} */
> @Override protected int serverCount() {
> return 0;
> }
> /** {@inheritDoc} */
> @Override protected void beforeTest() throws Exception {
> startGrids(SRV_CNT);
> }
> /** {@inheritDoc} */
> @Override protected void afterTest() throws Exception {
> stopAllGrids();
> }
> @Test
> public void testReconnectClientNotAnnotatedAffinityKeyGet() throws Exception {
> clientMode = true;
> final Ignite client = startGrid(SRV_CNT);
> assertTrue(client.cluster().localNode().isClient());
> final Ignite srv = clientRouter(client);
> final IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> );
> final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
> assertNotNull(srvCache);
> final String val = "val";
> clientCache.put(TestNotAnnotatedKey.of(1), val);
> assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
> assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
> reconnectClientNode(client, srv, new Runnable() {
> @Override public void run() {
> assertNotNull(srvCache.get(TestNotAnnotatedKey.of(1)));
> }
> });
> assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
> assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
> }
> @Test
> public void testReconnectClientNotAnnotatedAffinityKeyPartition() throws Exception {
> clientMode = true;
> client = startGrid(SRV_CNT);
> assertTrue(client.cluster().localNode().isClient());
> final Ignite srv = clientRouter(client);
> IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> // .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> );
> final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
> assertNotNull(srvCache);
> int partition = partition(TestNotAnnotatedKey.of(1), client);
> assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
> reconnectClientNode(client, srv, new Runnable() {
> @Override public void run() {
> assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
> }
> });
> assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
> assertEquals(partition, partition(TestNotAnnotatedKey.of(1), client));
> }
> @Test
> public void testReconnectClientAnnotatedAffinityKeyGet() throws Exception {
> clientMode = true;
> final Ignite client = startGrid(SRV_CNT);
> assertTrue(client.cluster().localNode().isClient());
> final Ignite srv = clientRouter(client);
> final IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> );
> final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
> assertNotNull(srvCache);
> final String val = "val";
> clientCache.put(TestAnnotatedKey.of(1), val);
> assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
> assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
> reconnectClientNode(client, srv, new Runnable() {
> @Override public void run() {
> assertNotNull(srvCache.get(TestAnnotatedKey.of(1)));
> }
> });
> assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
> assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
> }
> @Test
> public void testReconnectClientAnnotatedAffinityKeyPartition() throws Exception {
> clientMode = true;
> client = startGrid(SRV_CNT);
> assertTrue(client.cluster().localNode().isClient());
> final Ignite srv = clientRouter(client);
> IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
> // .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> );
> final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
> assertNotNull(srvCache);
> int partition = partition(TestAnnotatedKey.of(1), client);
> assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
> reconnectClientNode(client, srv, new Runnable() {
> @Override public void run() {
> assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
> }
> });
> assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
> assertEquals(partition, partition(TestAnnotatedKey.of(1), client));
> }
> private <K> int partition(K key, Ignite ign) {
> return ign.affinity(DEFAULT_CACHE_NAME).partition(key);
> }
> static class TestNotAnnotatedKey {
> private static final String AFFINITY_KEY_FIELD = "affinityKey";
> private int notAnnotatedAffinityKey;
> private int nonAffinityInfo;
> public TestNotAnnotatedKey(int affinityKey, int nonAffinityInfo) {
> this.notAnnotatedAffinityKey = affinityKey;
> this.nonAffinityInfo = nonAffinityInfo;
> }
> public static TestNotAnnotatedKey of(int affinityKey) {
> return new TestNotAnnotatedKey(affinityKey, affinityKey);
> }
> }
> static class TestAnnotatedKey {
> @AffinityKeyMapped
> private int annotatedAffinityKey;
> private int nonAffinityInfo;
> public TestAnnotatedKey(int affinityKey, int nonAffinityInfo) {
> this.annotatedAffinityKey = affinityKey;
> this.nonAffinityInfo = nonAffinityInfo;
> }
> public static TestAnnotatedKey of(int affinityKey) {
> return new TestAnnotatedKey(affinityKey, affinityKey);
> }
> }
> /**
> *
> */
> private static class TestCommunicationSpi extends TcpCommunicationSpi {
> /** */
> @LoggerResource
> private IgniteLogger log;
> /** */
> private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
> /** */
> private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
> /** {@inheritDoc} */
> @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
> throws IgniteSpiException {
> if (msg instanceof GridIoMessage) {
> Object msg0 = ((GridIoMessage)msg).message();
> synchronized (this) {
> Set<UUID> blockNodes = blockCls.get(msg0.getClass());
> if (F.contains(blockNodes, node.id())) {
> log.info("Block message [node=" +
> node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg0 + ']');
> blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
> return;
> }
> }
> }
> super.sendMessage(node, msg, ackClosure);
> }
> /**
> * @param cls Message class.
> * @param nodeId Node ID.
> */
> void blockMessages(Class<?> cls, UUID nodeId) {
> synchronized (this) {
> Set<UUID> set = blockCls.get(cls);
> if (set == null) {
> set = new HashSet<>();
> blockCls.put(cls, set);
> }
> set.add(nodeId);
> }
> }
> /**
> * @param snd Send messages flag.
> */
> void stopBlock(boolean snd) {
> synchronized (this) {
> blockCls.clear();
> if (snd) {
> for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
> ClusterNode node = msg.get1();
> log.info("Send blocked message: [node=" +
> node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
> ", msg=" + msg.get2().message() + ']');
> super.sendMessage(msg.get1(), msg.get2());
> }
> }
> blockedMsgs.clear();
> }
> }
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)