You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/08/17 16:54:12 UTC
[ignite] branch master updated: IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5f15c2d4451 IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)
5f15c2d4451 is described below
commit 5f15c2d44518f52159731a116721ce963881fcb8
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Wed Aug 17 19:54:05 2022 +0300
IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)
---
.../testsuites/IgniteControlUtilityTestSuite.java | 2 +
.../GridCommandHandlerConsistencyCountersTest.java | 586 +++++++++++++++++++++
.../apache/ignite/util/GridCommandHandlerTest.java | 162 ------
.../cache/PartitionUpdateCounterMvccImpl.java | 4 +-
.../cache/PartitionUpdateCounterTrackingImpl.java | 96 ++--
...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java | 2 +-
.../apache/ignite/testframework/GridTestUtils.java | 8 +-
7 files changed, 644 insertions(+), 216 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index efbd4ce577d..d102c06ce9f 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
+import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessTransactionalTest;
import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest;
@@ -98,6 +99,7 @@ import org.junit.runners.Suite;
GridCommandHandlerDefragmentationTest.class,
GridCommandHandlerConsistencyTest.class,
+ GridCommandHandlerConsistencyCountersTest.class,
GridCommandHandlerConsistencyBinaryTest.class,
GridCommandHandlerConsistencySensitiveTest.class,
GridCommandHandlerConsistencyRepairCorrectnessTransactionalTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java
new file mode 100644
index 00000000000..140a0f0000e
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.ReadRepairStrategy;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.transactions.TransactionHeuristicException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.ReadRepairStrategy.PRIMARY;
+import static org.apache.ignite.cache.ReadRepairStrategy.RELATIVE_MAJORITY;
+import static org.apache.ignite.cache.ReadRepairStrategy.REMOVE;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE;
+import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
+import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.NOTHING_FOUND;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
+import static org.apache.ignite.testframework.LogListener.matches;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class GridCommandHandlerConsistencyCountersTest extends GridCommandHandlerClusterPerMethodAbstractTest {
+ /** */
+ @Parameterized.Parameters(name = "strategy={0}, reuse={1}, historical={2}, atomicity={3}")
+ public static Iterable<Object[]> data() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (ReadRepairStrategy strategy : ReadRepairStrategy.values()) {
+ for (boolean reuse : new boolean[] {false, true}) {
+ for (boolean historical : new boolean[] {false, true}) {
+ for (CacheAtomicityMode atomicityMode : new CacheAtomicityMode[] {ATOMIC, TRANSACTIONAL})
+ res.add(new Object[] {strategy, reuse, historical, atomicityMode});
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * ReadRepair strategy
+ */
+ @Parameterized.Parameter
+ public ReadRepairStrategy strategy;
+
+ /**
+ * When true, updates will reuse already existing keys.
+ */
+ @Parameterized.Parameter(1)
+ public boolean reuseKeys;
+
+ /**
+ * When true, historical rebalance will be used instead of full.
+ */
+ @Parameterized.Parameter(2)
+ public boolean historical;
+
+ /**
+ * Cache atomicity mode
+ */
+ @Parameterized.Parameter(3)
+ public CacheAtomicityMode atomicityMode;
+
+ /** Listening logger. */
+ protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+ /** File IO blocked flag. */
+ private static volatile boolean ioBlocked;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setGridLogger(listeningLog);
+
+ cfg.getDataStorageConfiguration().setFileIOFactory(
+ new BlockableFileIOFactory(cfg.getDataStorageConfiguration().getFileIOFactory()));
+
+ cfg.getDataStorageConfiguration().setWalMode(WALMode.FSYNC); // Allows to use special IO at WAL as well.
+
+ cfg.setFailureHandler(new StopNodeFailureHandler()); // Helps to kill nodes on stop with disabled IO.
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static class BlockableFileIOFactory implements FileIOFactory {
+ /** IO Factory. */
+ private final FileIOFactory factory;
+
+ /**
+ * @param factory Factory.
+ */
+ public BlockableFileIOFactory(FileIOFactory factory) {
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ return new FileIODecorator(factory.create(file, modes)) {
+ @Override public int write(ByteBuffer srcBuf) throws IOException {
+ if (ioBlocked)
+ throw new IOException();
+
+ return super.write(srcBuf);
+ }
+
+ @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+ if (ioBlocked)
+ throw new IOException();
+
+ return super.write(srcBuf, position);
+ }
+
+ @Override public int write(byte[] buf, int off, int len) throws IOException {
+ if (ioBlocked)
+ throw new IOException();
+
+ return super.write(buf, off, len);
+ }
+ };
+ }
+ }
+
+ /**
+ * Checks counters and behaviour on crash recovery.
+ */
+ @Test
+ public void testCountersOnCrachRecovery() throws Exception {
+ int nodes = 3;
+ int backupNodes = nodes - 1;
+
+ IgniteEx ignite = startGrids(nodes);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
+ .setAffinity(new RendezvousAffinityFunction(false, 1))
+ .setBackups(backupNodes)
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomicityMode)
+ .setWriteSynchronizationMode(FULL_SYNC) // Allows to be sure that all messages are sent when put succeed.
+ .setReadFromBackup(true)); // Allows to check values on backups.
+
+ int updateCnt = 0;
+
+ // Initial preloading.
+ for (int i = 0; i < 2_000; i++) { // Enough to have historical rebalance when needed.
+ cache.put(i, i);
+
+ updateCnt++;
+ }
+
+ // Trick to have historical rebalance on cluster recovery (decreases percent of updates in comparison to cache size).
+ if (historical) {
+ stopAllGrids();
+ startGrids(nodes);
+ }
+
+ int preloadCnt = updateCnt;
+
+ Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+ List<Ignite> backups = backupNodes(0L, DEFAULT_CACHE_NAME);
+
+ AtomicBoolean prepareBlock = new AtomicBoolean();
+ AtomicBoolean finishBlock = new AtomicBoolean();
+
+ AtomicReference<CountDownLatch> blockLatch = new AtomicReference<>();
+
+ TestRecordingCommunicationSpi.spi(prim).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if ((msg instanceof GridDhtTxPrepareRequest && prepareBlock.get()) ||
+ ((msg instanceof GridDhtTxFinishRequest || msg instanceof GridDhtAtomicSingleUpdateRequest) && finishBlock.get())) {
+ CountDownLatch latch = blockLatch.get();
+
+ assertTrue(latch.getCount() > 0);
+
+ latch.countDown();
+
+ return true; // Generating counter misses.
+ }
+ else
+ return false;
+ }
+ });
+
+ int blockedFrom = reuseKeys ? 0 : preloadCnt;
+ int committedFrom = blockedFrom + 1_000;
+
+ int blockedKey = blockedFrom - 1; // None
+ int committedKey = committedFrom - 1; // None
+
+ List<String> backupMissed = new ArrayList<>();
+ List<String> primaryMissed = new ArrayList<>();
+
+ IgniteCache<Integer, Integer> primCache = prim.cache(DEFAULT_CACHE_NAME);
+
+ Consumer<Integer> cachePut = (key) -> primCache.put(key, -key);
+
+ GridCompoundFuture<?, ?> asyncPutFuts = new GridCompoundFuture<>();
+
+ Consumer<Integer> cachePutAsync = (key) -> asyncPutFuts.add(GridTestUtils.runAsync(() -> cachePut.accept(key)));
+
+ int primaryLwm = -1; // Primary LWM after data prepatation (at tx caches).
+ int backupHwm = -1; // Backups HWM after data preparation (at tx caches).
+
+ String backupMissedTail = null; // Misses after backupHwm, which backups are not aware of before the recovery.
+
+ int primaryKeysCnt = preloadCnt; // Keys present on primary.
+
+ int iters = 11;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ // The main idea of this section is to generate missed counters updates, some of them should be on backups only,
+ // some at primary too.
+ // This emulates the situation of reordering possible on real highloaded clusters.
+ for (int it = 0; it < iters; it++) {
+ boolean first = it == 0;
+ boolean last = it == iters - 1;
+
+ boolean globalBlock = it % 2 != 0 && // Global means all nodes (including primary) will miss the counters updates.
+ // Since atomic cache commits entry on primary before sending the request to backups, so all misses are only on backups.
+ atomicityMode != ATOMIC;
+
+ if (first || last) // Odd number to gain misses on backups only at first and last iteration.
+ // First iteration will guarantee rebalance for tx caches: Primary LWM > backup LWM.
+ // Last iteration will guarantee rebalance for atomic caches: Primary counter > backup counter.
+ assertTrue(!globalBlock);
+
+ int range = rnd.nextInt(1, 5);
+
+ blockLatch.set(new CountDownLatch(backupNodes * range));
+
+ if (globalBlock)
+ prepareBlock.set(true);
+ else
+ finishBlock.set(true);
+
+ for (int i = 0; i < range; i++) {
+ blockedKey++;
+ updateCnt++;
+
+ if (!globalBlock)
+ primaryKeysCnt++;
+
+ for (Ignite backup : backups) // Check before put.
+ assertEquals(reuseKeys, backup.cache(DEFAULT_CACHE_NAME).get(blockedKey) != null);
+
+ cachePutAsync.accept(blockedKey);
+ }
+
+ blockLatch.get().await();
+
+ prepareBlock.set(false);
+ finishBlock.set(false);
+
+ String missed = range == 1 ? String.valueOf(updateCnt) : (updateCnt - range + 1) + " - " + updateCnt;
+
+ if (last)
+ backupMissedTail = missed;
+ else
+ backupMissed.add(missed);
+
+ if (globalBlock)
+ primaryMissed.add(missed);
+
+ if (!last)
+ for (int i = 0; i < range; i++) {
+ committedKey++;
+ updateCnt++;
+ primaryKeysCnt++;
+
+ cachePut.accept(committedKey);
+ }
+
+ if (first)
+ primaryLwm = updateCnt;
+
+ if (!last)
+ backupHwm = updateCnt;
+ }
+
+ assertNotNull(backupMissedTail);
+ assertTrue(primaryLwm != -1);
+ assertTrue(backupHwm != -1);
+ assertTrue(blockedKey < committedFrom); // Intersection check.
+
+ if (reuseKeys)
+ assertTrue(blockedKey < preloadCnt); // Owerflow check.
+
+ for (int key = blockedFrom; key <= blockedKey; key++) {
+ for (Ignite backup : backups) // Check after put.
+ assertEquals(reuseKeys, backup.cache(DEFAULT_CACHE_NAME).get(key) != null);
+ }
+
+ for (int key = committedFrom; key <= committedKey; key++) {
+ assertNotNull(primCache.get(key));
+
+ for (Ignite backup : backups)
+ assertNotNull(backup.cache(DEFAULT_CACHE_NAME).get(key));
+ }
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ assertConflicts(true, true);
+
+ if (atomicityMode == TRANSACTIONAL) {
+ assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary
+ assertTxCounters(preloadCnt, backupMissed, backupHwm); // Backups
+ }
+ else {
+ assertAtomicCounters(updateCnt); // Primary
+ assertAtomicCounters(backupHwm); // Backups
+ }
+
+ LogListener lsnrRebalanceType = matches("fullPartitions=[" + (historical ? "" : 0) + "], " +
+ "histPartitions=[" + (historical ? 0 : "") + "]").times(backupNodes).build();
+
+ LogListener lsnrRebalanceAmount = matches("receivedKeys=" +
+ (historical ?
+ atomicityMode == TRANSACTIONAL ?
+ primaryLwm - preloadCnt : // Diff between primary LWM and backup LWM.
+ updateCnt - backupHwm + // Diff between primary and backup counters
+ DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE : // + magic number.
+ reuseKeys ?
+ preloadCnt : // Since keys are reused, amount is the same as at initial preloading.
+ atomicityMode == TRANSACTIONAL ?
+ primaryKeysCnt : // Calculated amount of entries (initial preload amount + updates not missed at primary)
+ updateCnt)) // All updates since only misses on backups generated for atomic caches at this test.
+ .times(backupNodes).build();
+
+ listeningLog.registerListener(lsnrRebalanceType);
+ listeningLog.registerListener(lsnrRebalanceAmount);
+
+ ioBlocked = true; // Emulating power off, OOM or disk overflow. Keeping data as is, with missed counters updates.
+
+ stopAllGrids();
+
+ checkAsyncPutOperationsFinished(asyncPutFuts);
+
+ ioBlocked = false;
+
+ ignite = startGrids(nodes);
+
+ awaitPartitionMapExchange();
+
+ assertTrue(lsnrRebalanceType.check());
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ assertConflicts(
+ atomicityMode == TRANSACTIONAL, // Atomic cache backup counters are automatically set as primary on crash recovery.
+ historical); // Full rebalance fixes the consistency.
+
+ assertTrue(lsnrRebalanceAmount.check());
+
+ if (atomicityMode == TRANSACTIONAL) { // Same as before the crash.
+ assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary
+ assertTxCounters(preloadCnt, backupMissed, backupHwm); // Backups
+ }
+ else if (historical) {
+ assertAtomicCounters(updateCnt); // Primary
+
+ // Backups updated with primary counter.
+ // Entries between primary and backup counters are rebalanced.
+ assertNoneAtomicCounters(backupHwm); // Automaticaly "repaired".
+ }
+ else
+ assertNoneAtomicCounters();
+
+ assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
+ ConsistencyCommand.CACHE, DEFAULT_CACHE_NAME,
+ ConsistencyCommand.PARTITION, "0",
+ ConsistencyCommand.STRATEGY, strategy.toString()));
+
+ int repairedCnt = repairedEntriesCount();
+
+ assertContains(log, testOut.toString(), historical ? CONSISTENCY_VIOLATIONS_FOUND : NOTHING_FOUND);
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ switch (strategy) {
+ case PRIMARY:
+ case REMOVE:
+ case RELATIVE_MAJORITY:
+ assertConflicts(atomicityMode == TRANSACTIONAL, false); // Inconsistency fixed.
+
+ break;
+
+ case LWW:
+ // LWM can't fix the data when some nodes contain the key, but some not,
+ // because missed entry has no version and can not be compared.
+ assertConflicts(atomicityMode == TRANSACTIONAL, historical && !reuseKeys);
+
+ break;
+
+ case CHECK_ONLY: // Keeps as is.
+ assertConflicts(atomicityMode == TRANSACTIONAL, historical);
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported strategy");
+ }
+
+ int postloadFrom = reuseKeys ? 0 : (committedKey + 1 /*greater than max key used*/);
+
+ for (int i = postloadFrom; i < postloadFrom + preloadCnt; i++) {
+ updateCnt++;
+
+ ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
+ }
+
+ // Repairing one more time, but with guarantee to fix (primary strategy);
+ assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
+ ConsistencyCommand.CACHE, DEFAULT_CACHE_NAME,
+ ConsistencyCommand.PARTITION, "0",
+ ConsistencyCommand.STRATEGY, PRIMARY.toString()));
+
+ repairedCnt += repairedEntriesCount();
+
+ updateCnt += repairedCnt;
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ backupMissed.add(backupMissedTail); // Now backups got updates after the generated data and aware of all missed updates.
+
+ if (atomicityMode == TRANSACTIONAL) {
+ assertConflicts(true, false); // Still has counters conflicts, while inconsistency already fixed.
+
+ assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary (same as before crash).
+ assertTxCounters(preloadCnt, backupMissed, updateCnt); // Backups (all missed updates, increased hwm).
+ }
+ else
+ assertConflicts(false, false); // Fixed both counters and inconsistency.
+
+ int rmvd = strategy == RELATIVE_MAJORITY || strategy == REMOVE ? repairedCnt /*entries removed during the repair*/ : 0;
+
+ for (Ignite node : G.allGrids()) // Checking the cache size after the fixes.
+ assertEquals((reuseKeys ? preloadCnt : primaryKeysCnt + preloadCnt /*postload*/ - rmvd),
+ node.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL));
+ }
+
+ /**
+ * Checks idle_vefify result.
+ *
+ * @param counter Counter conflicts.
+ * @param hash Hash conflicts.
+ */
+ private void assertConflicts(boolean counter, boolean hash) {
+ if (counter || hash)
+ assertContains(log, testOut.toString(), "conflict partitions has been found: " +
+ "[counterConflicts=" + (counter ? 1 : 0) + ", hashConflicts=" + (hash ? 1 : 0) + "]");
+ else
+ assertContains(log, testOut.toString(), "no conflicts have been found");
+ }
+
+ /**
+ * @param lwm Lwm.
+ * @param missed Missed.
+ * @param hwm Hwm.
+ */
+ private void assertTxCounters(int lwm, List<String> missed, int hwm) {
+ assertContains(log, testOut.toString(),
+ "updateCntr=[lwm=" + lwm + ", missed=" + missed + ", hwm=" + hwm + "]");
+ }
+
+ /**
+ * @param cnt Counter.
+ */
+ private void assertAtomicCounters(int cnt) {
+ assertContains(log, testOut.toString(), "updateCntr=" + cnt);
+ }
+
+ /**
+ * @param cnt Counter.
+ */
+ private void assertNoneAtomicCounters(int... cnt) {
+ assertNotContains(log, testOut.toString(), "updateCntr=" + (F.isEmpty(cnt) ? "" : cnt[0]));
+ }
+
+ /**
+ * Returns amount of entries repaired by Consistency Repair operation.
+ */
+ private int repairedEntriesCount() {
+ Pattern pattern = Pattern.compile("repaired=(\\d*),");
+ Matcher matcher = pattern.matcher(testOut.toString());
+
+ return matcher.find() ?
+ Integer.parseInt(testOut.toString().substring(matcher.start(1), matcher.end(1))) :
+ 0;
+ }
+
+ /**
+ * Checks that all async put operations are finished.
+ */
+ private void checkAsyncPutOperationsFinished(GridCompoundFuture<?, ?> asyncPutFuts) {
+ asyncPutFuts.markInitialized();
+
+ try {
+ asyncPutFuts.get();
+
+ if (atomicityMode != ATOMIC) // Atomics already committed at primary before the fail, so no falure on get() is expected.
+ fail(); // But tx cache is still committing, so get() must throw an exception.
+ }
+ catch (IgniteCheckedException | TransactionHeuristicException ex) {
+ assertTrue(atomicityMode != ATOMIC);
+ }
+ }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 296cee8783a..1669e2251e6 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -44,10 +44,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
@@ -64,7 +62,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.ShutdownPolicy;
-import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
@@ -89,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -140,11 +136,9 @@ import org.junit.Test;
import static java.io.File.separatorChar;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CLUSTER_NAME;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
@@ -2316,162 +2310,6 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
fail("Should be found dump with conflicts");
}
- /**
- * Tests that idle verify checks gaps.
- */
- @Test
- public void testCacheIdleVerifyChecksGapsAtomic() throws Exception {
- testCacheIdleVerifyChecksGaps(ATOMIC);
- }
-
- /**
- * Tests that idle verify checks gaps.
- */
- @Test
- public void testCacheIdleVerifyChecksGapsTx() throws Exception {
- testCacheIdleVerifyChecksGaps(TRANSACTIONAL);
- }
-
- /**
- * Tests that idle verify checks gaps.
- */
- private void testCacheIdleVerifyChecksGaps(CacheAtomicityMode atomicityMode) throws Exception {
- int parts = 1;
-
- IgniteEx ignite = startGrids(3);
-
- ignite.cluster().active(true);
-
- int backups = 2;
-
- IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
- .setAffinity(new RendezvousAffinityFunction(false, parts))
- .setBackups(backups)
- .setName(DEFAULT_CACHE_NAME)
- .setAtomicityMode(atomicityMode)
- .setWriteSynchronizationMode(PRIMARY_SYNC)
- .setReadFromBackup(true));
-
- int cnt = 0;
-
- for (int i = 0; i < 100; i++) {
- cache.put(i, i);
-
- cnt++;
- }
-
- int cntFrom = cnt;
-
- Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
- Ignite backup = backupNode(0L, DEFAULT_CACHE_NAME);
-
- TestRecordingCommunicationSpi primSpi = TestRecordingCommunicationSpi.spi(prim);
-
- AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
-
- primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- if (msg instanceof GridDhtTxFinishRequest ||
- msg instanceof GridDhtAtomicSingleUpdateRequest) {
- CountDownLatch blockLatch = latchRef.get();
-
- boolean block = blockLatch.getCount() > 0;
-
- blockLatch.countDown();
-
- return block; // Generating counter gaps.
- }
- else
- return false;
- }
- });
-
- int blockedKey = cntFrom + 1_000;
- int committedKey = blockedKey + 1_000;
-
- int blockedFrom = blockedKey;
- int committedFrom = committedKey;
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- List<String> gaps = new ArrayList<>();
-
- Consumer<Integer> cachePut = (key) -> {
- if (atomicityMode == TRANSACTIONAL)
- try (Transaction tx = prim.transactions().txStart()) {
- prim.cache(DEFAULT_CACHE_NAME).put(key, key);
-
- tx.commit();
- }
- else
- prim.cache(DEFAULT_CACHE_NAME).put(key, key);
- };
-
- for (int it = 0; it < 10; it++) {
- int range = rnd.nextInt(3);
-
- CountDownLatch blockLatch = new CountDownLatch(backups * range);
-
- latchRef.set(blockLatch);
-
- for (int i = 0; i < range; i++) {
- cachePut.accept(blockedKey++);
-
- cnt++;
- }
-
- if (range == 1)
- gaps.add(String.valueOf(cnt));
- else if (range > 1)
- gaps.add((cnt - range + 1) + " - " + cnt);
-
- blockLatch.await();
-
- for (int i = 0; i < range; i++) {
- cachePut.accept(committedKey++);
-
- cnt++;
- }
- }
-
- for (int key = blockedFrom; key < blockedKey; key++) {
- assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
- assertNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key)); // Commit is blocked.
- }
-
- for (int key = committedFrom; key < committedKey; key++) {
- assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
- assertNotNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
- }
-
- G.restart(true);
-
- ignite.cluster().active(true);
-
- awaitPartitionMapExchange();
-
- injectTestSystemOut();
-
- assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
-
- if (atomicityMode == TRANSACTIONAL) {
- assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=1, " +
- "hashConflicts=1]");
-
- assertContains(log, testOut.toString(),
- "updateCntr=[lwm=" + cnt + ", missed=[], hwm=" + cnt + "]"); // Primary
-
- assertContains(log, testOut.toString(),
- "updateCntr=[lwm=" + cntFrom + ", missed=" + gaps + ", hwm=" + cnt + "]"); // Backups.
- }
- else {
- assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=0, " +
- "hashConflicts=1]");
-
- assertContains(log, testOut.toString(), "updateCntr=" + cnt); // All
- }
- }
-
/**
* @return Build matcher for dump file name.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
index 2e3066a2a04..5c1b2844a1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
@@ -49,11 +49,11 @@ public class PartitionUpdateCounterMvccImpl extends PartitionUpdateCounterTracki
@Override public PartitionUpdateCounter copy() {
PartitionUpdateCounterMvccImpl copy = new PartitionUpdateCounterMvccImpl(grp);
- copy.cntr.set(cntr.get());
+ copy.lwm.set(lwm.get());
copy.first = first;
copy.queue = new TreeMap<>(queue);
copy.initCntr = initCntr;
- copy.reserveCntr.set(reserveCntr.get());
+ copy.reservedCntr.set(reservedCntr.get());
return copy;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 80bb1807cde..e75872b2cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -74,10 +74,10 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
protected NavigableMap<Long, Item> queue = new TreeMap<>();
/** LWM. */
- protected final AtomicLong cntr = new AtomicLong();
+ protected final AtomicLong lwm = new AtomicLong();
- /** HWM. */
- protected final AtomicLong reserveCntr = new AtomicLong();
+ /** Reserved. */
+ protected final AtomicLong reservedCntr = new AtomicLong();
/** */
protected boolean first = true;
@@ -100,13 +100,13 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
- cntr.set(initUpdCntr);
+ lwm.set(initUpdCntr);
initCntr = initUpdCntr;
queue = fromBytes(cntrUpdData);
- reserveCntr.set(highestAppliedCounter());
+ reservedCntr.set(highestAppliedCounter());
}
/** {@inheritDoc} */
@@ -116,21 +116,21 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/** {@inheritDoc} */
@Override public long get() {
- return cntr.get();
+ return lwm.get();
}
/** */
protected synchronized long highestAppliedCounter() {
- return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
+ return queue.isEmpty() ? lwm.get() : queue.lastEntry().getValue().absolute();
}
/**
* @return Next update counter. For tx mode called by {@link DataStreamerImpl} IsolatedUpdater.
*/
@Override public long next() {
- long next = cntr.incrementAndGet();
+ long next = lwm.incrementAndGet();
- reserveCntr.set(next);
+ reservedCntr.set(next);
return next;
}
@@ -138,25 +138,27 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/** {@inheritDoc} */
@Override public synchronized void update(long val) throws IgniteCheckedException {
// Reserved update counter is updated only on exchange.
- long cur = get();
+ long curLwm = lwm.get();
// Always set reserved counter equal to max known counter.
- long max = Math.max(val, cur);
+ long max = Math.max(val, curLwm);
+ long reserved = reservedCntr.get();
- if (reserveCntr.get() < max)
- reserveCntr.set(max);
+ if (reserved < max)
+ reservedCntr.set(max);
// Outdated counter (txs are possible before current topology future is finished if primary is not changed).
- if (val < cur)
+ if (val < curLwm)
return;
// Absolute counter should be not less than last applied update.
// Otherwise supplier doesn't contain some updates and rebalancing couldn't restore consistency.
// Best behavior is to stop node by failure handler in such a case.
if (val < highestAppliedCounter())
- throw new IgniteCheckedException("Failed to update the counter [newVal=" + val + ", curState=" + this + ']');
+ throw new IgniteCheckedException("Failed to update the counter " +
+ "[newVal=" + val + ", prevReserved=" + reserved + ", curState=" + this + ']');
- cntr.set(val);
+ lwm.set(val);
/** If some holes are present at this point, thar means some update were missed on recovery and will be restored
* during rebalance. All gaps are safe to "forget".
@@ -170,7 +172,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/** {@inheritDoc} */
@Override public synchronized boolean update(long start, long delta) {
- long cur = cntr.get();
+ long cur = lwm.get();
if (cur > start)
return false;
@@ -213,7 +215,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
if (nextItem != null)
next += nextItem.delta;
- boolean res = cntr.compareAndSet(cur, next);
+ boolean res = lwm.compareAndSet(cur, next);
assert res;
@@ -227,8 +229,8 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
initCntr = get();
- if (reserveCntr.get() < initCntr)
- reserveCntr.set(initCntr);
+ if (reservedCntr.get() < initCntr)
+ reservedCntr.set(initCntr);
}
/** {@inheritDoc} */
@@ -241,37 +243,37 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
if (gaps == null)
gaps = new GridLongList((queue.size() + 1) * 2);
- long start = cntr.get() + 1;
+ long start = lwm.get() + 1;
long end = item.getValue().start;
gaps.add(start);
gaps.add(end);
// Close pending ranges.
- cntr.set(item.getValue().absolute());
+ lwm.set(item.getValue().absolute());
item = queue.pollFirstEntry();
}
- reserveCntr.set(get());
+ reservedCntr.set(get());
return gaps;
}
/** {@inheritDoc} */
@Override public synchronized long reserve(long delta) {
- long cntr = get();
+ long lwm = get();
- long reserved = reserveCntr.getAndAdd(delta);
+ long reserved = reservedCntr.getAndAdd(delta);
- assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved + ", cntr=" + toString();
+ assert reserved >= lwm : "LWM after reserved: lwm=" + lwm + ", reserved=" + reserved + ", cntr=" + this;
return reserved;
}
/** {@inheritDoc} */
@Override public long next(long delta) {
- return cntr.getAndAdd(delta);
+ return lwm.getAndAdd(delta);
}
/** {@inheritDoc} */
@@ -344,9 +346,9 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
@Override public synchronized void reset() {
initCntr = 0;
- cntr.set(0);
+ lwm.set(0);
- reserveCntr.set(0);
+ reservedCntr.set(0);
queue.clear();
}
@@ -361,7 +363,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
*/
private static class Item {
/** */
- private long start;
+ private final long start;
/** */
private long delta;
@@ -431,12 +433,12 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
if (!queue.equals(cntr.queue))
return false;
- return this.cntr.get() == cntr.cntr.get();
+ return lwm.get() == cntr.lwm.get();
}
/** {@inheritDoc} */
@Override public long reserved() {
- return reserveCntr.get();
+ return reservedCntr.get();
}
/** {@inheritDoc} */
@@ -452,21 +454,21 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
/**
* Human-readable missed unordered updates.
*/
- private String gaps() {
- List<String> gaps = new ArrayList<>();
+ private String missed() {
+ List<String> missed = new ArrayList<>();
- long prev = cntr.get();
+ long prev = lwm.get();
for (Item item : queue.values()) {
if (prev + 1 == item.start)
- gaps.add(String.valueOf(item.start));
+ missed.add(String.valueOf(item.start));
else
- gaps.add((prev + 1) + " - " + item.start);
+ missed.add((prev + 1) + " - " + item.start);
prev = item.start + item.delta;
}
- return gaps.toString();
+ return missed.toString();
}
/** {@inheritDoc} */
@@ -476,7 +478,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
long hwm;
synchronized (this) {
- missed = gaps();
+ missed = missed();
lwm = get();
@@ -499,16 +501,16 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
String missed;
long lwm;
long hwm;
- long maxApplied;
+ long reserved;
synchronized (this) {
- missed = gaps();
+ missed = missed();
lwm = get();
- hwm = reserveCntr.get();
+ hwm = highestAppliedCounter();
- maxApplied = highestAppliedCounter();
+ reserved = reservedCntr.get();
}
return new SB()
@@ -516,10 +518,10 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
.a(lwm)
.a(", missed=")
.a(missed)
- .a(", maxApplied=")
- .a(maxApplied)
.a(", hwm=")
.a(hwm)
+ .a(", reserved=")
+ .a(reserved)
.a(']')
.toString();
}
@@ -533,11 +535,11 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
@Override public PartitionUpdateCounter copy() {
PartitionUpdateCounterTrackingImpl copy = createInstance();
- copy.cntr.set(cntr.get());
+ copy.lwm.set(lwm.get());
copy.first = first;
copy.queue = new TreeMap<>(queue);
copy.initCntr = initCntr;
- copy.reserveCntr.set(reserveCntr.get());
+ copy.reservedCntr.set(reservedCntr.get());
return copy;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
index 960fa7f4ace..c7df9186058 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -154,7 +154,7 @@ public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends GridCommonAbstra
PartitionUpdateCounterTrackingImpl delegate = U.field(cntr0, "delegate");
- AtomicLong cntr = U.field(delegate, "cntr");
+ AtomicLong cntr = U.field(delegate, "lwm");
cntr.set(cntr.get() - 1);
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index d9f8ade92ae..31c4b9200e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1127,7 +1127,7 @@ public final class GridTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static IgniteInternalFuture runAsync(final Runnable task) {
+ public static <T> IgniteInternalFuture<T> runAsync(final Runnable task) {
return runAsync(task, "async-runnable-runner");
}
@@ -1137,7 +1137,7 @@ public final class GridTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static IgniteInternalFuture runAsync(final RunnableX task) {
+ public static <T> IgniteInternalFuture<T> runAsync(final RunnableX task) {
return runAsync(task, "async-runnable-runner");
}
@@ -1147,7 +1147,7 @@ public final class GridTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static IgniteInternalFuture runAsync(final Runnable task, String threadName) {
+ public static <T> IgniteInternalFuture<T> runAsync(final Runnable task, String threadName) {
return runAsync(() -> {
task.run();
@@ -1161,7 +1161,7 @@ public final class GridTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static IgniteInternalFuture runAsync(final RunnableX task, String threadName) {
+ public static <T> IgniteInternalFuture<T> runAsync(final RunnableX task, String threadName) {
return runAsync(() -> {
task.run();