You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Alexei Scherbakov (JIRA)" <ji...@apache.org> on 2018/08/05 16:02:00 UTC

[jira] [Created] (IGNITE-9188) Unexpected eviction leading to data lost in a scenario with stopping/restarting nodes during rebalancing

Alexei Scherbakov created IGNITE-9188:
-----------------------------------------

             Summary: Unexpected eviction leading to data lost in a scenario with stopping/restarting nodes during rebalancing
                 Key: IGNITE-9188
                 URL: https://issues.apache.org/jira/browse/IGNITE-9188
             Project: Ignite
          Issue Type: Bug
            Reporter: Alexei Scherbakov
            Assignee: Alexei Scherbakov
             Fix For: 2.7


Scenario:

1. Split grid nodes in two groups with distinct partition mapping. One group holds even partitions, other - odd. Rebalancing of even partitions is only triggered when number of nodes in grid exceeds n/2 threshold.

2. Start n/2 nodes, activate, put data into even partitions.

3. Start other n/2 nodes, change BLT, delay rebalancing of even partitions.

4. Stop newly started nodes before rebalancing is finished.

Expected behavior: parttiions in "even" group will keep owning state.

Actual behavior: even partitions are evicted leading to data loss.

Unit test reproducer:

{noformat}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.distributed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;

/**
 *
 */
public class CacheLostPartitionsRestoreStateTest extends GridCommonAbstractTest {
    /** */
    public static final long MB = 1024 * 1024L;

    /** */
    public static final String GRP_ATTR = "grp";

    /** */
    public static final int GRIDS_CNT = 6;

    /** */
    public static final String CACHE_1 = "filled";

    /** */
    public static final String CACHE_2 = "empty";

    /** */
    public static final String EVEN_GRP = "event";

    /** */
    public static final String ODD_GRP = "odd";

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());

        CacheConfiguration ccfg = new CacheConfiguration("default");

        ccfg.setAffinity(new RendezvousAffinityFunction(false, CacheConfiguration.MAX_PARTITIONS_COUNT));

        cfg.setCacheConfiguration(ccfg);

        cfg.setPeerClassLoadingEnabled(true);

        Map<String, Object> attrs = new HashMap<>();

        attrs.put(GRP_ATTR, grp(getTestIgniteInstanceIndex(igniteInstanceName)));

        cfg.setUserAttributes(attrs);

        DataStorageConfiguration memCfg = new DataStorageConfiguration()
            .setDefaultDataRegionConfiguration(
                new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(50 * MB).setMaxSize(50 * MB))
            .setWalMode(WALMode.LOG_ONLY);

        cfg.setDataStorageConfiguration(memCfg);

        cfg.setCacheConfiguration(configuration(CACHE_1), configuration(CACHE_2));

        return cfg;
    }

    /**
     * @param name Name.
     */
    private CacheConfiguration configuration(String name) {
        return new CacheConfiguration(name).
            setCacheMode(CacheMode.PARTITIONED).
            setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
            setBackups(2).
            setRebalanceBatchSize(1).
            setAffinity(new TestAffinityFunction().setPartitions(32));
    }

    /**
     * @param idx Index.
     */
    private String grp(int idx) {
        return idx < GRIDS_CNT / 2 ? EVEN_GRP : ODD_GRP;
    }

    /**
     * @throws Exception if failed.
     */
    public void test() throws Exception {
        try {
            Ignite ignite = startGridsMultiThreaded(GRIDS_CNT / 2, false);

            ignite.cluster().active(true);

            awaitPartitionMapExchange();

            int blockPartId = 1;

            int c = 0;

            for (int i = 0; i < 1000; i++) {
                if (ignite.affinity(CACHE_1).partition(i) == blockPartId) {
                    ignite.cache(CACHE_1).put(i, i);

                    c++;
                }
            }

            assertEquals(c, ignite.cache(CACHE_1).size());

            startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2);

            // Prevent rebalancing to new nodes.
            for (Ignite ig0 : G.allGrids()) {
                TestRecordingCommunicationSpi.spi(ig0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
                    @Override public boolean apply(ClusterNode node, Message message) {
                        if (message instanceof GridDhtPartitionDemandMessage) {
                            assertTrue(node.order() <= GRIDS_CNT / 2);

                            GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)message;

                            return msg.groupId() == CU.cacheId(CACHE_1) || msg.groupId() == CU.cacheId(CACHE_2);
                        }

                        return false;
                    }
                });
            }

            ignite.cluster().setBaselineTopology(GRIDS_CNT);

            for (Ignite ig0 : G.allGrids()) {
                if (ig0.cluster().localNode().order() <= GRIDS_CNT / 2)
                    continue;

                TestRecordingCommunicationSpi.spi(ig0).waitForBlocked();
            }

            assertEquals(c, ignite.cache(CACHE_1).size());

            assertEquals(c, ignite.cache(CACHE_1).size());

            int i = 0;

            while(i < GRIDS_CNT / 2) {
                stopGrid(GRIDS_CNT / 2 + i);

                i++;
            }

            awaitPartitionMapExchange();

            for (Ignite ig : G.allGrids()) {
                GridDhtLocalPartition locPart = dht(ig.cache(CACHE_1)).topology().localPartition(blockPartId);

                assertNotNull(locPart);

                assertTrue(locPart.state() == OWNING);
            }
        }
        finally {
            stopAllGrids();
        }
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        cleanPersistenceDir();
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        cleanPersistenceDir();
    }

    /** */
    public static class TestAffinityFunction extends RendezvousAffinityFunction {
        /** */
        public TestAffinityFunction() {
        }

        /** */
        public TestAffinityFunction(boolean exclNeighbors) {
            super(exclNeighbors);
        }

        /** */
        public TestAffinityFunction(boolean exclNeighbors, int parts) {
            super(exclNeighbors, parts);
        }

        /** */
        public TestAffinityFunction(int parts,
            @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
            super(parts, backupFilter);
        }

        /** {@inheritDoc} */
        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
            int parts = partitions();

            List<List<ClusterNode>> assignments = new ArrayList<>(parts);

            Map<UUID, Collection<ClusterNode>> neighborhoodCache = isExcludeNeighbors() ?
                GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;

            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();

            Map<Object, List<ClusterNode>> nodesByGrp = U.newHashMap(2);

            for (ClusterNode node : nodes) {
                Object grp = node.attribute(GRP_ATTR);

                List<ClusterNode> grpNodes = nodesByGrp.get(grp);

                if (grpNodes == null)
                    nodesByGrp.put(grp, (grpNodes = new ArrayList<>()));

                grpNodes.add(node);
            }

            boolean split = nodesByGrp.size() == 2;

            for (int i = 0; i < parts; i++) {
                List<ClusterNode> partAssignment = assignPartition(i, split ?
                        nodesByGrp.get(i % 2 == 0 ? EVEN_GRP : ODD_GRP) : nodes,
                    affCtx.backups(), neighborhoodCache);

                assignments.add(partAssignment);
            }

            return assignments;
        }
    }
}
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)