You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/24 15:09:30 UTC

[GitHub] [ignite] anton-vinogradov commented on a diff in pull request #9907: IGNITE-15329 Atomics should be repairable by Read Repair

anton-vinogradov commented on code in PR #9907:
URL: https://github.com/apache/ignite/pull/9907#discussion_r880639102


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ReadRepairDataGenerator.java:
##########
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.consistency;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.ReadRepairStrategy;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class ReadRepairDataGenerator {
+    /** Key. */
+    private final AtomicInteger incrementalKey = new AtomicInteger();
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Nodes aware of the entry value class. */
+    private final List<Ignite> clsAwareNodes;
+
+    /** External class loader. */
+    private final ClassLoader extClsLdr;
+
+    /** Primary node. */
+    private final BiFunction<Integer, String, Ignite> primaryNode;
+
+    /** Backup nodes. */
+    private final BiFunction<Integer, String, List<Ignite>> backupNodes;
+
+    /** Server nodes count. */
+    private final Supplier<Integer> serverNodesCnt;
+
+    /** Backups count. */
+    private final Supplier<Integer> backupsCnt;
+
+    /**
+     * @param cacheName Cache name.
+     * @param clsAwareNodes Class aware nodes.
+     * @param extClsLdr Ext class loader.
+     * @param primaryNode Primary node.
+     * @param backupNodes Backup nodes.
+     * @param serverNodesCnt Server nodes count.
+     * @param backupsCnt Backups count.
+     */
+    public ReadRepairDataGenerator(
+        String cacheName,
+        List<Ignite> clsAwareNodes,
+        ClassLoader extClsLdr,
+        BiFunction<Integer, String, Ignite> primaryNode,
+        BiFunction<Integer, String, List<Ignite>> backupNodes,
+        Supplier<Integer> serverNodesCnt,
+        Supplier<Integer> backupsCnt) {
+        this.cacheName = cacheName;
+        this.clsAwareNodes = Collections.unmodifiableList(clsAwareNodes);
+        this.extClsLdr = extClsLdr;
+        this.primaryNode = primaryNode;
+        this.backupNodes = backupNodes;
+        this.serverNodesCnt = serverNodesCnt;
+        this.backupsCnt = backupsCnt;
+    }
+
+    /**
+     * Generates inconsistent data and checks it repairs properly.
+     *
+     * @param initiator Node used to perform the Read Repair operation during the check.
+     * @param cnt       Count of entries to be generated/checked.
+     * @param raw       Raw read flag. True means required GetEntry() instead of get().
+     * @param async     Async read flag.
+     * @param misses    Skiping entries generation on some owners.
+     * @param nulls     Removing entries after the generation on some nodes.
+     * @param binary    Read Repair will be performed with keeping data binary.
+     * @param strategy  Strategy to perform the Read Repair.
+     * @param c         Lambda consumes generated data and performs the Read Repair check.
+     */
+    public void generateAndCheck(
+        Ignite initiator,
+        int cnt,
+        boolean raw,
+        boolean async,
+        boolean misses,
+        boolean nulls,
+        boolean binary,
+        ReadRepairStrategy strategy,
+        Consumer<ReadRepairData> c) throws Exception {
+        IgniteCache<Integer, Object> cache = initiator.getOrCreateCache(cacheName);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        ReadRepairStrategy[] strategies = ReadRepairStrategy.values();
+
+        for (int i = 0; i < rnd.nextInt(1, 10); i++) {
+            ReadRepairStrategy keyStrategy = strategy != null ? strategy : strategies[rnd.nextInt(strategies.length)];
+
+            Map<Integer, InconsistentMapping> results = new TreeMap<>(); // Sorted to avoid warning.
+
+            try {
+                for (int j = 0; j < cnt; j++) {
+                    int curKey = incrementalKey.incrementAndGet();
+
+                    InconsistentMapping res = setDifferentValuesForSameKey(curKey, misses, nulls, keyStrategy);
+
+                    results.put(curKey, res);
+                }
+
+                for (Ignite node : G.allGrids()) { // Check that cache filled properly.
+                    Map<Integer, Object> all =
+                        node.getOrCreateCache(cacheName).<Integer, Object>withKeepBinary().getAll(results.keySet());
+
+                    for (Map.Entry<Integer, Object> entry : all.entrySet()) {
+                        Integer key = entry.getKey();
+                        Object val = entry.getValue();
+
+                        T2<Object, GridCacheVersion> valVer = results.get(key).mappingBin.get(node);
+
+                        Object exp;
+
+                        if (valVer != null)
+                            exp = valVer.get1(); // Should read from itself (backup or primary).
+                        else
+                            exp = results.get(key).primaryBin; // Or read from primary (when not a partition owner).
+
+                        assertEquals(exp, val);
+                    }
+                }
+
+                c.accept(new ReadRepairData(cache, results, raw, async, keyStrategy, binary));
+            }
+            catch (Throwable th) {
+                StringBuilder sb = new StringBuilder();
+
+                sb.append("Read Repair test failed [")
+                    .append("cache=").append(cache.getName())
+                    .append(", strategy=").append(keyStrategy)
+                    .append("]\n");
+
+                for (Map.Entry<Integer, InconsistentMapping> entry : results.entrySet()) {
+                    sb.append("Key: ").append(entry.getKey()).append("\n");
+
+                    InconsistentMapping mapping = entry.getValue();
+
+                    sb.append(" Generated data [primary=").append(unwrapBinaryIfNeeded(mapping.primaryBin))
+                        .append(", repaired=").append(unwrapBinaryIfNeeded(mapping.repairedBin))
+                        .append(", repairable=").append(mapping.repairable)
+                        .append(", consistent=").append(mapping.consistent)
+                        .append("]\n");
+
+                    sb.append("  Distribution: \n");
+
+                    for (Map.Entry<Ignite, T2<Object, GridCacheVersion>> dist : mapping.mappingBin.entrySet()) {
+                        sb.append("   Node: ").append(dist.getKey().name()).append("\n");
+                        sb.append("    Value: ").append(unwrapBinaryIfNeeded(dist.getValue().get1())).append("\n");
+                        sb.append("    Version: ").append(dist.getValue().get2()).append("\n");
+                    }
+
+                    sb.append("\n");
+                }
+
+                throw new Exception(sb.toString(), th);
+            }
+        }
+    }
+
+    /**
+     * Generated entries count.
+     */
+    public int generated() {
+        return incrementalKey.get();
+    }
+
+    /**
+     *
+     */
+    private InconsistentMapping setDifferentValuesForSameKey(
+        int key,
+        boolean misses,
+        boolean nulls,
+        ReadRepairStrategy strategy) throws Exception {
+        List<Ignite> nodes = new ArrayList<>();
+        Map<Ignite, T2<Object, GridCacheVersion>> mapping = new HashMap<>();
+
+        Ignite primary = primaryNode.apply(key, cacheName);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        boolean wrap = rnd.nextBoolean();

Review Comment:
   Can't, 2 usages 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org