You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/06/19 09:04:36 UTC

[ignite] branch master updated: IGNITE-11878 Rebuild index skips MOVING partitions when historical re balance - Fixed #6578.

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d0d87f  IGNITE-11878 Rebuild index skips MOVING partitions when historical re balance - Fixed #6578.
6d0d87f is described below

commit 6d0d87fa1e2db8e08dd68b9916bbebb454cb7c60
Author: mstepachev <ma...@gmail.com>
AuthorDate: Wed Jun 19 12:04:26 2019 +0300

    IGNITE-11878 Rebuild index skips MOVING partitions when historical re balance - Fixed #6578.
    
    Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
 .../query/schema/SchemaIndexCacheVisitorImpl.java  |   3 +-
 .../RebuildIndexWithHistoricalRebalanceTest.java   | 304 +++++++++++++++++++++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   2 +
 3 files changed, 308 insertions(+), 1 deletion(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index e2f660a..7f57043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
 
@@ -178,7 +179,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
         boolean reserved = false;
 
         if (part != null && part.state() != EVICTED)
-            reserved = (part.state() == OWNING || part.state() == RENTING) && part.reserve();
+            reserved = (part.state() == OWNING || part.state() == RENTING || part.state() == MOVING) && part.reserve();
 
         if (!reserved)
             return;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
new file mode 100644
index 0000000..ead112a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.database;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+
+/**
+ * Rebuild index after index.bin remove, when partition is moving.
+ */
+public class RebuildIndexWithHistoricalRebalanceTest extends GridCommonAbstractTest {
+    /** Rebalance cache name. */
+    private static final String CACHE_NAME = "cache_name";
+
+    /** Supply message latch. */
+    private static final AtomicReference<CountDownLatch> SUPPLY_MESSAGE_LATCH = new AtomicReference<>();
+
+    /** Test logger. */
+    private final ListeningTestLogger log = new ListeningTestLogger(false, super.log);
+
+    /**
+     * User key.
+     */
+    private static class UserKey {
+        /** A. */
+        private int account;
+
+        /**
+         * @param a A.
+         */
+        public UserKey(int account) {
+            this.account = account;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "UserKey{" +
+                "account=" + account +
+                '}';
+        }
+    }
+
+    /**
+     * User value.
+     */
+    private static class UserValue {
+        /** balance. */
+        private int balance;
+
+        /**
+         * @param balance balance.
+         */
+        public UserValue(int balance) {
+            this.balance = balance;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "UserValue{" +
+                "balance=" + balance +
+                '}';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setConsistentId(gridName);
+        cfg.setGridLogger(log);
+
+        QueryEntity qryEntity = new QueryEntity();
+        qryEntity.setKeyType(UserKey.class.getName());
+        qryEntity.setValueType(UserValue.class.getName());
+        qryEntity.setKeyFields(new HashSet<>(Arrays.asList("account")));
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        fields.put("account", "java.lang.Integer");
+        fields.put("balance", "java.lang.Integer");
+        qryEntity.setFields(fields);
+
+        QueryIndex idx1 = new QueryIndex();
+        idx1.setName("IDX_1");
+        idx1.setIndexType(QueryIndexType.SORTED);
+        LinkedHashMap<String, Boolean> idxFields = new LinkedHashMap<>();
+        idxFields.put("account", false);
+        idxFields.put("balance", false);
+        idx1.setFields(idxFields);
+
+        QueryIndex idx2 = new QueryIndex();
+        idx2.setName("IDX_2");
+        idx2.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("balance", false);
+        idx2.setFields(idxFields);
+
+        qryEntity.setIndexes(Arrays.asList(idx1, idx2));
+
+        cfg.setCacheConfiguration(new CacheConfiguration<UserKey, UserValue>()
+            .setName(CACHE_NAME)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setCacheMode(REPLICATED)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setOnheapCacheEnabled(true)
+            .setEvictionPolicy(new FifoEvictionPolicy(1000))
+            .setAffinity(new RendezvousAffinityFunction(false, 1))
+            .setQueryEntities(Collections.singleton(qryEntity)));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setInitialSize(200L * 1024 * 1024)
+                        .setMaxSize(200L * 1024 * 1024)
+                )
+        );
+
+        cfg.setCommunicationSpi(new RebalanceBlockingSPI());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        log.clearListeners();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     *
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0") // Use only historical rebalance
+    public void shouldRebuldIndexForMovingPartitionWithHistoricalRebalance() throws Exception {
+        IgniteEx node1 = startGrid(0);
+        startGrid(1);
+
+        node1.cluster().active(true);
+
+        IgniteCache<UserKey, UserValue> cache = node1.getOrCreateCache(CACHE_NAME);
+
+        cache.put(new UserKey(1), new UserValue(333));
+
+        stopGrid(1);
+
+        cache.put(new UserKey(2), new UserValue(555));
+
+        SUPPLY_MESSAGE_LATCH.set(new CountDownLatch(1));
+
+        removeIndexBin(1);
+
+        LogListener rebuildLsnr = finishIndexRebuildLsnr(CACHE_NAME);
+
+        IgniteEx node2 = startGrid(1);
+
+        assertTrue(GridTestUtils.waitForCondition(rebuildLsnr::check, 10_000));
+
+        SUPPLY_MESSAGE_LATCH.get().countDown();
+
+        awaitPartitionMapExchange();
+
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0);
+        node2.context().resource().injectGeneric(clo);
+        VisorValidateIndexesJobResult res = clo.call();
+
+        assertFalse(res.hasIssues());
+    }
+
+    /** */
+    private LogListener finishIndexRebuildLsnr(String cacheName) {
+        LogListener lsnr = LogListener.matches(s -> s.startsWith("Finished indexes rebuilding for cache [name=" + cacheName)).times(1).build();
+
+        log.registerListener(lsnr);
+
+        return lsnr;
+    }
+
+    /** */
+    private void removeIndexBin(int nodeId) throws IgniteCheckedException {
+        U.delete(
+            U.resolveWorkDirectory(
+                U.defaultWorkDirectory(),
+                "db/" + U.maskForFileName(getTestIgniteInstanceName(nodeId)) + "/cache-" + CACHE_NAME + "/" + INDEX_FILE_NAME,
+                false
+            )
+        );
+    }
+
+    /** */
+    private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+                if (grpId == CU.cacheId(CACHE_NAME)) {
+                    CountDownLatch latch0 = SUPPLY_MESSAGE_LATCH.get();
+
+                    if (latch0 != null)
+                        try {
+                            latch0.await();
+                        }
+                        catch (InterruptedException ex) {
+                            throw new IgniteException(ex);
+                        }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+                if (grpId == CU.cacheId(CACHE_NAME)) {
+                    CountDownLatch latch0 = SUPPLY_MESSAGE_LATCH.get();
+
+                    if (latch0 != null)
+                        try {
+                            latch0.await();
+                        }
+                        catch (InterruptedException ex) {
+                            throw new IgniteException(ex);
+                        }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 16b762c..984d17e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithInde
 import org.apache.ignite.internal.processors.database.IgnitePersistentStoreQueryWithMultipleClassesPerCacheTest;
 import org.apache.ignite.internal.processors.database.IgnitePersistentStoreSchemaLoadTest;
 import org.apache.ignite.internal.processors.database.IgniteTwoRegionsRebuildIndexTest;
+import org.apache.ignite.internal.processors.database.RebuildIndexWithHistoricalRebalanceTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -42,6 +43,7 @@ import org.junit.runners.Suite;
     IgnitePersistentStoreQueryWithMultipleClassesPerCacheTest.class,
     IgniteTwoRegionsRebuildIndexTest.class,
     IgniteTcBotInitNewPageTest.class,
+    RebuildIndexWithHistoricalRebalanceTest.class,
     IndexingMultithreadedLoadContinuousRestartTest.class
 })
 public class IgnitePdsWithIndexingTestSuite {