You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Sergey Kosarev (Jira)" <ji...@apache.org> on 2023/03/07 08:12:00 UTC
[jira] [Created] (IGNITE-18976) Affinity broken on the client after reconnection
Sergey Kosarev created IGNITE-18976:
---------------------------------------
Summary: Affinity broken on the client after reconnection
Key: IGNITE-18976
URL: https://issues.apache.org/jira/browse/IGNITE-18976
Project: Ignite
Issue Type: Bug
Components: binary
Affects Versions: 2.16
Reporter: Sergey Kosarev
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
*
*/
public class IgniteClientReconnectAffinityTest extends IgniteClientReconnectAbstractTest {
/** */
private static final int SRV_CNT = 1;
/** */
private UUID nodeId;
private Ignite client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TestCommunicationSpi commSpi = new TestCommunicationSpi();
commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
cfg.setPeerClassLoadingEnabled(false);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
cfg.setCacheKeyConfiguration(new CacheKeyConfiguration(TestNotAnnotatedKey.class.getName(), TestNotAnnotatedKey.AFFINITY_KEY_FIELD))
.setBinaryConfiguration(
new BinaryConfiguration()
.setTypeConfigurations(Arrays.asList(
new BinaryTypeConfiguration()
.setTypeName(TestNotAnnotatedKey.class.getName()),
new BinaryTypeConfiguration()
.setTypeName(TestAnnotatedKey.class.getName())
))
)
;
return cfg;
}
/** {@inheritDoc} */
@Override protected int serverCount() {
return 0;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGrids(SRV_CNT);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
@Test
public void testReconnectClientNotAnnotatedAffinityKeyGet() throws Exception {
clientMode = true;
final Ignite client = startGrid(SRV_CNT);
assertTrue(client.cluster().localNode().isClient());
final Ignite srv = clientRouter(client);
final IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
assertNotNull(srvCache);
final String val = "val";
clientCache.put(TestNotAnnotatedKey.of(1), val);
assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
reconnectClientNode(client, srv, new Runnable() {
@Override public void run() {
assertNotNull(srvCache.get(TestNotAnnotatedKey.of(1)));
}
});
assertEquals(val, srvCache.get(TestNotAnnotatedKey.of(1)));
assertEquals(val, clientCache.get(TestNotAnnotatedKey.of(1)));
}
@Test
public void testReconnectClientNotAnnotatedAffinityKeyPartition() throws Exception {
clientMode = true;
client = startGrid(SRV_CNT);
assertTrue(client.cluster().localNode().isClient());
final Ignite srv = clientRouter(client);
IgniteCache<TestNotAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestNotAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
// .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
final IgniteCache<TestNotAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
assertNotNull(srvCache);
int partition = partition(TestNotAnnotatedKey.of(1), client);
assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
reconnectClientNode(client, srv, new Runnable() {
@Override public void run() {
assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
}
});
assertEquals(partition, partition(TestNotAnnotatedKey.of(1), srv));
assertEquals(partition, partition(TestNotAnnotatedKey.of(1), client));
}
@Test
public void testReconnectClientAnnotatedAffinityKeyGet() throws Exception {
clientMode = true;
final Ignite client = startGrid(SRV_CNT);
assertTrue(client.cluster().localNode().isClient());
final Ignite srv = clientRouter(client);
final IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
assertNotNull(srvCache);
final String val = "val";
clientCache.put(TestAnnotatedKey.of(1), val);
assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
reconnectClientNode(client, srv, new Runnable() {
@Override public void run() {
assertNotNull(srvCache.get(TestAnnotatedKey.of(1)));
}
});
assertEquals(val, srvCache.get(TestAnnotatedKey.of(1)));
assertEquals(val, clientCache.get(TestAnnotatedKey.of(1)));
}
@Test
public void testReconnectClientAnnotatedAffinityKeyPartition() throws Exception {
clientMode = true;
client = startGrid(SRV_CNT);
assertTrue(client.cluster().localNode().isClient());
final Ignite srv = clientRouter(client);
IgniteCache<TestAnnotatedKey, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<TestAnnotatedKey, Object>(DEFAULT_CACHE_NAME)
// .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
final IgniteCache<TestAnnotatedKey, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
assertNotNull(srvCache);
int partition = partition(TestAnnotatedKey.of(1), client);
assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
reconnectClientNode(client, srv, new Runnable() {
@Override public void run() {
assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
}
});
assertEquals(partition, partition(TestAnnotatedKey.of(1), srv));
assertEquals(partition, partition(TestAnnotatedKey.of(1), client));
}
private <K> int partition(K key, Ignite ign) {
return ign.affinity(DEFAULT_CACHE_NAME).partition(key);
}
static class TestNotAnnotatedKey {
private static final String AFFINITY_KEY_FIELD = "affinityKey";
private int notAnnotatedAffinityKey;
private int nonAffinityInfo;
public TestNotAnnotatedKey(int affinityKey, int nonAffinityInfo) {
this.notAnnotatedAffinityKey = affinityKey;
this.nonAffinityInfo = nonAffinityInfo;
}
public static TestNotAnnotatedKey of(int affinityKey) {
return new TestNotAnnotatedKey(affinityKey, affinityKey);
}
}
static class TestAnnotatedKey {
@AffinityKeyMapped
private int annotatedAffinityKey;
private int nonAffinityInfo;
public TestAnnotatedKey(int affinityKey, int nonAffinityInfo) {
this.annotatedAffinityKey = affinityKey;
this.nonAffinityInfo = nonAffinityInfo;
}
public static TestAnnotatedKey of(int affinityKey) {
return new TestAnnotatedKey(affinityKey, affinityKey);
}
}
/**
*
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** */
@LoggerResource
private IgniteLogger log;
/** */
private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
/** */
private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
synchronized (this) {
Set<UUID> blockNodes = blockCls.get(msg0.getClass());
if (F.contains(blockNodes, node.id())) {
log.info("Block message [node=" +
node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", msg=" + msg0 + ']');
blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
return;
}
}
}
super.sendMessage(node, msg, ackClosure);
}
/**
* @param cls Message class.
* @param nodeId Node ID.
*/
void blockMessages(Class<?> cls, UUID nodeId) {
synchronized (this) {
Set<UUID> set = blockCls.get(cls);
if (set == null) {
set = new HashSet<>();
blockCls.put(cls, set);
}
set.add(nodeId);
}
}
/**
* @param snd Send messages flag.
*/
void stopBlock(boolean snd) {
synchronized (this) {
blockCls.clear();
if (snd) {
for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
ClusterNode node = msg.get1();
log.info("Send blocked message: [node=" +
node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
", msg=" + msg.get2().message() + ']');
super.sendMessage(msg.get1(), msg.get2());
}
}
blockedMsgs.clear();
}
}
}
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)