You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Alexei Scherbakov (JIRA)" <ji...@apache.org> on 2018/08/05 16:02:00 UTC
[jira] [Created] (IGNITE-9188) Unexpected eviction leading to data
lost in a scenario with stopping/restarting nodes during rebalancing
Alexei Scherbakov created IGNITE-9188:
-----------------------------------------
Summary: Unexpected eviction leading to data lost in a scenario with stopping/restarting nodes during rebalancing
Key: IGNITE-9188
URL: https://issues.apache.org/jira/browse/IGNITE-9188
Project: Ignite
Issue Type: Bug
Reporter: Alexei Scherbakov
Assignee: Alexei Scherbakov
Fix For: 2.7
Scenario:
1. Split grid nodes in two groups with distinct partition mapping. One group holds even partitions, other - odd. Rebalancing of even partitions is only triggered when number of nodes in grid exceeds n/2 threshold.
2. Start n/2 nodes, activate, put data into even partitions.
3. Start other n/2 nodes, change BLT, delay rebalancing of even partitions.
4. Stop newly started nodes before rebalancing is finished.
Expected behavior: parttiions in "even" group will keep owning state.
Actual behavior: even partitions are evicted leading to data loss.
Unit test reproducer:
{noformat}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
*
*/
public class CacheLostPartitionsRestoreStateTest extends GridCommonAbstractTest {
/** */
public static final long MB = 1024 * 1024L;
/** */
public static final String GRP_ATTR = "grp";
/** */
public static final int GRIDS_CNT = 6;
/** */
public static final String CACHE_1 = "filled";
/** */
public static final String CACHE_2 = "empty";
/** */
public static final String EVEN_GRP = "event";
/** */
public static final String ODD_GRP = "odd";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
CacheConfiguration ccfg = new CacheConfiguration("default");
ccfg.setAffinity(new RendezvousAffinityFunction(false, CacheConfiguration.MAX_PARTITIONS_COUNT));
cfg.setCacheConfiguration(ccfg);
cfg.setPeerClassLoadingEnabled(true);
Map<String, Object> attrs = new HashMap<>();
attrs.put(GRP_ATTR, grp(getTestIgniteInstanceIndex(igniteInstanceName)));
cfg.setUserAttributes(attrs);
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(50 * MB).setMaxSize(50 * MB))
.setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
cfg.setCacheConfiguration(configuration(CACHE_1), configuration(CACHE_2));
return cfg;
}
/**
* @param name Name.
*/
private CacheConfiguration configuration(String name) {
return new CacheConfiguration(name).
setCacheMode(CacheMode.PARTITIONED).
setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
setBackups(2).
setRebalanceBatchSize(1).
setAffinity(new TestAffinityFunction().setPartitions(32));
}
/**
* @param idx Index.
*/
private String grp(int idx) {
return idx < GRIDS_CNT / 2 ? EVEN_GRP : ODD_GRP;
}
/**
* @throws Exception if failed.
*/
public void test() throws Exception {
try {
Ignite ignite = startGridsMultiThreaded(GRIDS_CNT / 2, false);
ignite.cluster().active(true);
awaitPartitionMapExchange();
int blockPartId = 1;
int c = 0;
for (int i = 0; i < 1000; i++) {
if (ignite.affinity(CACHE_1).partition(i) == blockPartId) {
ignite.cache(CACHE_1).put(i, i);
c++;
}
}
assertEquals(c, ignite.cache(CACHE_1).size());
startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2);
// Prevent rebalancing to new nodes.
for (Ignite ig0 : G.allGrids()) {
TestRecordingCommunicationSpi.spi(ig0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message message) {
if (message instanceof GridDhtPartitionDemandMessage) {
assertTrue(node.order() <= GRIDS_CNT / 2);
GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)message;
return msg.groupId() == CU.cacheId(CACHE_1) || msg.groupId() == CU.cacheId(CACHE_2);
}
return false;
}
});
}
ignite.cluster().setBaselineTopology(GRIDS_CNT);
for (Ignite ig0 : G.allGrids()) {
if (ig0.cluster().localNode().order() <= GRIDS_CNT / 2)
continue;
TestRecordingCommunicationSpi.spi(ig0).waitForBlocked();
}
assertEquals(c, ignite.cache(CACHE_1).size());
assertEquals(c, ignite.cache(CACHE_1).size());
int i = 0;
while(i < GRIDS_CNT / 2) {
stopGrid(GRIDS_CNT / 2 + i);
i++;
}
awaitPartitionMapExchange();
for (Ignite ig : G.allGrids()) {
GridDhtLocalPartition locPart = dht(ig.cache(CACHE_1)).topology().localPartition(blockPartId);
assertNotNull(locPart);
assertTrue(locPart.state() == OWNING);
}
}
finally {
stopAllGrids();
}
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
cleanPersistenceDir();
}
/** */
public static class TestAffinityFunction extends RendezvousAffinityFunction {
/** */
public TestAffinityFunction() {
}
/** */
public TestAffinityFunction(boolean exclNeighbors) {
super(exclNeighbors);
}
/** */
public TestAffinityFunction(boolean exclNeighbors, int parts) {
super(exclNeighbors, parts);
}
/** */
public TestAffinityFunction(int parts,
@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
super(parts, backupFilter);
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
int parts = partitions();
List<List<ClusterNode>> assignments = new ArrayList<>(parts);
Map<UUID, Collection<ClusterNode>> neighborhoodCache = isExcludeNeighbors() ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
Map<Object, List<ClusterNode>> nodesByGrp = U.newHashMap(2);
for (ClusterNode node : nodes) {
Object grp = node.attribute(GRP_ATTR);
List<ClusterNode> grpNodes = nodesByGrp.get(grp);
if (grpNodes == null)
nodesByGrp.put(grp, (grpNodes = new ArrayList<>()));
grpNodes.add(node);
}
boolean split = nodesByGrp.size() == 2;
for (int i = 0; i < parts; i++) {
List<ClusterNode> partAssignment = assignPartition(i, split ?
nodesByGrp.get(i % 2 == 0 ? EVEN_GRP : ODD_GRP) : nodes,
affCtx.backups(), neighborhoodCache);
assignments.add(partAssignment);
}
return assignments;
}
}
}
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)