You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Alexei Scherbakov (JIRA)" <ji...@apache.org> on 2018/08/05 16:02:00 UTC

[jira] [Updated] (IGNITE-9188) Unexpected eviction leading to data loss in a scenario with stopping/restarting nodes during rebalancing

     [ https://issues.apache.org/jira/browse/IGNITE-9188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexei Scherbakov updated IGNITE-9188:
--------------------------------------
    Summary: Unexpected eviction leading to data loss in a scenario with stopping/restarting nodes during rebalancing  (was: Unexpected eviction leading to data lost in a scenario with stopping/restarting nodes during rebalancing)

> Unexpected eviction leading to data loss in a scenario with stopping/restarting nodes during rebalancing
> --------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-9188
>                 URL: https://issues.apache.org/jira/browse/IGNITE-9188
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Alexei Scherbakov
>            Assignee: Alexei Scherbakov
>            Priority: Major
>             Fix For: 2.7
>
>
> Scenario:
> 1. Split grid nodes in two groups with distinct partition mapping. One group holds even partitions, other - odd. Rebalancing of even partitions is only triggered when number of nodes in grid exceeds n/2 threshold.
> 2. Start n/2 nodes, activate, put data into even partitions.
> 3. Start other n/2 nodes, change BLT, delay rebalancing of even partitions.
> 4. Stop newly started nodes before rebalancing is finished.
> Expected behavior: parttiions in "even" group will keep owning state.
> Actual behavior: even partitions are evicted leading to data loss.
> Unit test reproducer:
> {noformat}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.ignite.internal.processors.cache.distributed;
> import java.util.ArrayList;
> import java.util.Collection;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.UUID;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.cache.affinity.AffinityFunctionContext;
> import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.DataRegionConfiguration;
> import org.apache.ignite.configuration.DataStorageConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.configuration.WALMode;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import org.apache.ignite.internal.processors.cache.GridCacheUtils;
> import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
> import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
> import org.apache.ignite.internal.util.typedef.G;
> import org.apache.ignite.internal.util.typedef.internal.CU;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.lang.IgniteBiPredicate;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.jetbrains.annotations.Nullable;
> import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
> /**
>  *
>  */
> public class CacheLostPartitionsRestoreStateTest extends GridCommonAbstractTest {
>     /** */
>     public static final long MB = 1024 * 1024L;
>     /** */
>     public static final String GRP_ATTR = "grp";
>     /** */
>     public static final int GRIDS_CNT = 6;
>     /** */
>     public static final String CACHE_1 = "filled";
>     /** */
>     public static final String CACHE_2 = "empty";
>     /** */
>     public static final String EVEN_GRP = "event";
>     /** */
>     public static final String ODD_GRP = "odd";
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
>         cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
>         CacheConfiguration ccfg = new CacheConfiguration("default");
>         ccfg.setAffinity(new RendezvousAffinityFunction(false, CacheConfiguration.MAX_PARTITIONS_COUNT));
>         cfg.setCacheConfiguration(ccfg);
>         cfg.setPeerClassLoadingEnabled(true);
>         Map<String, Object> attrs = new HashMap<>();
>         attrs.put(GRP_ATTR, grp(getTestIgniteInstanceIndex(igniteInstanceName)));
>         cfg.setUserAttributes(attrs);
>         DataStorageConfiguration memCfg = new DataStorageConfiguration()
>             .setDefaultDataRegionConfiguration(
>                 new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(50 * MB).setMaxSize(50 * MB))
>             .setWalMode(WALMode.LOG_ONLY);
>         cfg.setDataStorageConfiguration(memCfg);
>         cfg.setCacheConfiguration(configuration(CACHE_1), configuration(CACHE_2));
>         return cfg;
>     }
>     /**
>      * @param name Name.
>      */
>     private CacheConfiguration configuration(String name) {
>         return new CacheConfiguration(name).
>             setCacheMode(CacheMode.PARTITIONED).
>             setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
>             setBackups(2).
>             setRebalanceBatchSize(1).
>             setAffinity(new TestAffinityFunction().setPartitions(32));
>     }
>     /**
>      * @param idx Index.
>      */
>     private String grp(int idx) {
>         return idx < GRIDS_CNT / 2 ? EVEN_GRP : ODD_GRP;
>     }
>     /**
>      * @throws Exception if failed.
>      */
>     public void test() throws Exception {
>         try {
>             Ignite ignite = startGridsMultiThreaded(GRIDS_CNT / 2, false);
>             ignite.cluster().active(true);
>             awaitPartitionMapExchange();
>             int blockPartId = 1;
>             int c = 0;
>             for (int i = 0; i < 1000; i++) {
>                 if (ignite.affinity(CACHE_1).partition(i) == blockPartId) {
>                     ignite.cache(CACHE_1).put(i, i);
>                     c++;
>                 }
>             }
>             assertEquals(c, ignite.cache(CACHE_1).size());
>             startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2);
>             // Prevent rebalancing to new nodes.
>             for (Ignite ig0 : G.allGrids()) {
>                 TestRecordingCommunicationSpi.spi(ig0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
>                     @Override public boolean apply(ClusterNode node, Message message) {
>                         if (message instanceof GridDhtPartitionDemandMessage) {
>                             assertTrue(node.order() <= GRIDS_CNT / 2);
>                             GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)message;
>                             return msg.groupId() == CU.cacheId(CACHE_1) || msg.groupId() == CU.cacheId(CACHE_2);
>                         }
>                         return false;
>                     }
>                 });
>             }
>             ignite.cluster().setBaselineTopology(GRIDS_CNT);
>             for (Ignite ig0 : G.allGrids()) {
>                 if (ig0.cluster().localNode().order() <= GRIDS_CNT / 2)
>                     continue;
>                 TestRecordingCommunicationSpi.spi(ig0).waitForBlocked();
>             }
>             assertEquals(c, ignite.cache(CACHE_1).size());
>             assertEquals(c, ignite.cache(CACHE_1).size());
>             int i = 0;
>             while(i < GRIDS_CNT / 2) {
>                 stopGrid(GRIDS_CNT / 2 + i);
>                 i++;
>             }
>             awaitPartitionMapExchange();
>             for (Ignite ig : G.allGrids()) {
>                 GridDhtLocalPartition locPart = dht(ig.cache(CACHE_1)).topology().localPartition(blockPartId);
>                 assertNotNull(locPart);
>                 assertTrue(locPart.state() == OWNING);
>             }
>         }
>         finally {
>             stopAllGrids();
>         }
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTest() throws Exception {
>         cleanPersistenceDir();
>     }
>     /** {@inheritDoc} */
>     @Override protected void afterTest() throws Exception {
>         cleanPersistenceDir();
>     }
>     /** */
>     public static class TestAffinityFunction extends RendezvousAffinityFunction {
>         /** */
>         public TestAffinityFunction() {
>         }
>         /** */
>         public TestAffinityFunction(boolean exclNeighbors) {
>             super(exclNeighbors);
>         }
>         /** */
>         public TestAffinityFunction(boolean exclNeighbors, int parts) {
>             super(exclNeighbors, parts);
>         }
>         /** */
>         public TestAffinityFunction(int parts,
>             @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
>             super(parts, backupFilter);
>         }
>         /** {@inheritDoc} */
>         @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
>             int parts = partitions();
>             List<List<ClusterNode>> assignments = new ArrayList<>(parts);
>             Map<UUID, Collection<ClusterNode>> neighborhoodCache = isExcludeNeighbors() ?
>                 GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
>             List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
>             Map<Object, List<ClusterNode>> nodesByGrp = U.newHashMap(2);
>             for (ClusterNode node : nodes) {
>                 Object grp = node.attribute(GRP_ATTR);
>                 List<ClusterNode> grpNodes = nodesByGrp.get(grp);
>                 if (grpNodes == null)
>                     nodesByGrp.put(grp, (grpNodes = new ArrayList<>()));
>                 grpNodes.add(node);
>             }
>             boolean split = nodesByGrp.size() == 2;
>             for (int i = 0; i < parts; i++) {
>                 List<ClusterNode> partAssignment = assignPartition(i, split ?
>                         nodesByGrp.get(i % 2 == 0 ? EVEN_GRP : ODD_GRP) : nodes,
>                     affCtx.backups(), neighborhoodCache);
>                 assignments.add(partAssignment);
>             }
>             return assignments;
>         }
>     }
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)