You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/12/15 16:41:27 UTC

[GitHub] [ignite] anton-vinogradov opened a new pull request #9661: IGNITE-15330 Read Repair should support strategies

anton-vinogradov opened a new pull request #9661:
URL: https://github.com/apache/ignite/pull/9661


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r773790497



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)
+                if (fixesPerEntry > 0) {
+                    brokenParts.decrementAndGet();
 
-            if (brokenParts.get() > 0)
-                assertContains(log, testOut.toString(),
-                    "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
-            else
-                assertContains(log, testOut.toString(), "no conflicts have been found");
+                    if (brokenParts.get() > 0)
+                        assertContains(log, testOut.toString(),
+                            "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
+                    else
+                        assertContains(log, testOut.toString(), "no conflicts have been found");
+                }
+                else
+                    assertContains(log, testOut.toString(),
+                        "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
         }
     }
 
     /**
      *
      */
-    private void readRepaitAtomic(AtomicInteger brokenParts, String cacheName) {
-        for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of previous (tx case), implement atomic repair to make this happen :)
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
-            assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=0"); // Nothing fixed.
+    private Integer fixesPerEntry() {
+        switch (strategy) {
+            case PRIMARY:
+            case REMOVE:
+                return 1;
 
-            assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
-            assertContains(log, testOut.toString(),
-                "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
+            case CHECK_ONLY:
+                return 0;
+
+            case MAJORITY:
+            case LWW:
+                return null; // Who knows :)

Review comment:
       I think a test has to be written in the such way that it actually knows what exactly should be fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4833,6 +4835,9 @@ private boolean clearLocally0(K key, boolean readers) {
                 deserializeBinary,
                 needVer);
         }
+        catch (IgniteIrreparableConsistencyViolationException e) {
+            throw e;
+        }
         catch (IgniteConsistencyViolationException e) {
             repairAsync(key, ctx.operationContextPerCall(), false).get();

Review comment:
       IMHO, recursion without a limit param is always a bad solution. It looks ok that if `repairAsync` is successful then `repairableGet` will be successfull too. But in one moment due to some bug we can get a StackOverflowException here.

##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)
+                if (fixesPerEntry > 0) {
+                    brokenParts.decrementAndGet();
 
-            if (brokenParts.get() > 0)
-                assertContains(log, testOut.toString(),
-                    "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
-            else
-                assertContains(log, testOut.toString(), "no conflicts have been found");
+                    if (brokenParts.get() > 0)
+                        assertContains(log, testOut.toString(),
+                            "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
+                    else
+                        assertContains(log, testOut.toString(), "no conflicts have been found");
+                }
+                else
+                    assertContains(log, testOut.toString(),
+                        "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
         }
     }
 
     /**
      *
      */
-    private void readRepaitAtomic(AtomicInteger brokenParts, String cacheName) {
-        for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of previous (tx case), implement atomic repair to make this happen :)
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
-            assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=0"); // Nothing fixed.
+    private Integer fixesPerEntry() {
+        switch (strategy) {
+            case PRIMARY:
+            case REMOVE:
+                return 1;
 
-            assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
-            assertContains(log, testOut.toString(),
-                "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
+            case CHECK_ONLY:
+                return 0;
+
+            case MAJORITY:
+            case LWW:
+                return null; // Who knows :)
+
+            default:
+                throw new UnsupportedOperationException("Unsupported trategy");

Review comment:
       s/trategy/strategy/

##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)

Review comment:
       Actually there is no any check in `testAtomicAndTx` and `testCacheFilter` for MAJORITY/LWW strategy. If `fixesPerEntry` equals to `null`, there is no checks at all, except of single run of the command without additional validations.

##########
File path: modules/core/src/main/java/org/apache/ignite/IgniteCache.java
##########
@@ -188,6 +189,19 @@
      * <li>{@link IgniteCache#get} && {@link IgniteCache#getAsync}</li>
      * <li>{@link IgniteCache#getAll} && {@link IgniteCache#getAllAsync}</li>
      * </ul>
+     * @param strategy Read Repair strategy.
+     * @return Cache with explicit consistency check on each read and repair if necessary.
+     */
+    @IgniteExperimental
+    public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy);

Review comment:
       AFAIU, `withReadRepair()` is low-level API, that assumed to be used by external utils for specific cases with big care. So, I don't like the idea of "default" strategy. Also, it's just an experimental API, so i think we should make it as narrow as possible.
   
   Let's enforce user to use this API with strict understanding what does he/she want to fix and how, then `strategy` will be a required parameter. Also we can write in javadocs some info like "In most cases the strategy LWW should be OK, but check if it serves your needs." 
   
   But this API is still experimental, so default behavior can be changed in the future after users actually starts use it. And it will be impossible to change the hardcoded default. But instead we can change javadocs only. 
   
   WDYT?

##########
File path: modules/core/src/main/java/org/apache/ignite/IgniteCache.java
##########
@@ -188,6 +189,19 @@
      * <li>{@link IgniteCache#get} && {@link IgniteCache#getAsync}</li>
      * <li>{@link IgniteCache#getAll} && {@link IgniteCache#getAllAsync}</li>
      * </ul>
+     * @param strategy Read Repair strategy.
+     * @return Cache with explicit consistency check on each read and repair if necessary.
+     */
+    @IgniteExperimental
+    public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy);
+
+    /**
+     * <b>This is an experimental API.</b>
+     * <p>
+     * Gets an instance of {@code IgniteCache} that will perform backup nodes check on each get attempt with default
+     * conflict resolve strategy.
+     *
+     * @see IgniteCache#withReadRepair(ReadRepairStrategy) for defails.

Review comment:
       s/defails/details/

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;

Review comment:
       But `GridNearReadRepairAbstractFuture` creates this event with the `Object` class in generics for both `entries` and `fixed`. Why do we need this change? 

##########
File path: docs/_docs/tools/control-script.adoc
##########
@@ -1052,6 +1053,7 @@ Parameters:
 | Parameter | Description
 | `cache-name`| Cache to be checked/repaired..

Review comment:
       Double dot in the end of line.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+
+/**
+ * Read repair strategies.

Review comment:
       Let's add some more docs here. I think we should write here about known trade-offs of different methods, or write that actually they have similar performance. 

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;

Review comment:
       let's make all fields `private`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -50,7 +51,7 @@
     private final boolean recovery;
 
     /** Read-repair flag. */

Review comment:
       Fix javadoc here too

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;
+
     /**
      * Creates a new instance of CacheConsistencyViolationEvent.
-     *
-     * @param cacheName Cache name.
+     *  @param cacheName Cache name.

Review comment:
       Please, remove the whitespace before `@param`

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;
+
     /**
      * Creates a new instance of CacheConsistencyViolationEvent.
-     *
-     * @param cacheName Cache name.
+     *  @param cacheName Cache name.
      * @param node Local node.
      * @param msg Event message.
      * @param entries Collection of original entries.
+     * @param fixed Collection of fixed entries.
+     * @param strategy
      */
     public CacheConsistencyViolationEvent(
         String cacheName,
         ClusterNode node,
         String msg,
-        Map<Object, Map<ClusterNode, EntryInfo>> entries) {
+        Map<?, Map<ClusterNode, EntryInfo>> entries,
+        Map<?, ?> fixed, ReadRepairStrategy strategy) {

Review comment:
       Move the `strategy` definition on new line. 

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;
+
     /**
      * Creates a new instance of CacheConsistencyViolationEvent.
-     *
-     * @param cacheName Cache name.
+     *  @param cacheName Cache name.
      * @param node Local node.
      * @param msg Event message.
      * @param entries Collection of original entries.
+     * @param fixed Collection of fixed entries.
+     * @param strategy

Review comment:
       Add a short doc with a dot in the end of line.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -92,7 +93,7 @@ public CacheOperationContext(
         boolean noRetries,
         @Nullable Byte dataCenterId,
         boolean recovery,
-        boolean readRepair,
+        ReadRepairStrategy readRepairStrategy,

Review comment:
       Please mark it with `@Nullable`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
##########
@@ -294,7 +294,7 @@ public GridCacheProxyImpl(
                         false,
                         null,
                         false,
-                        false,
+                        null,

Review comment:
       Here and below, let's replace it with `CacheOperationContext.keepBinary()`

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;
+
     /**
      * Creates a new instance of CacheConsistencyViolationEvent.
-     *
-     * @param cacheName Cache name.
+     *  @param cacheName Cache name.
      * @param node Local node.
      * @param msg Event message.
      * @param entries Collection of original entries.
+     * @param fixed Collection of fixed entries.
+     * @param strategy
      */
     public CacheConsistencyViolationEvent(
         String cacheName,
         ClusterNode node,
         String msg,
-        Map<Object, Map<ClusterNode, EntryInfo>> entries) {
+        Map<?, Map<ClusterNode, EntryInfo>> entries,
+        Map<?, ?> fixed, ReadRepairStrategy strategy) {
         super(node, msg, EVT_CONSISTENCY_VIOLATION);
 
         this.cacheName = cacheName;
         this.entries = entries;
+        this.fixed = fixed;
+        this.strategy = strategy;
     }
 
     /**
      * Returns a mapping of keys to a collection of original entries.
      *
      * @return Collection of original entries.
      */
-    public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() {
+    public Map<?, Map<ClusterNode, EntryInfo>> getEntries() {

Review comment:
       Usage of this method applies `Object key` for accessing entries. Why do we need this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775893488



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)

Review comment:
       This test does not check the strategies work.
   It checks `idle_verify` -> `repair` -> `idle_verify`.
   `MAJORITY/LWW` may fix all entries, may fix some, may fix none, we only check they cause no issues when started from `control.sh`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775970523



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+
+/**
+ * Read repair strategies.

Review comment:
       Javadoc updated. 
   `ReadRepairStrategy.MAJORITY` changed to `ReadRepairStrategy.RELATIVE_MAJORITY`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776385526



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4896,13 +4901,13 @@ protected V get(
             /*skip vals*/false,
             needVer);
 
-        if (readRepair) {
+        if (readRepairStrategy != null) {
             CacheOperationContext opCtx = ctx.operationContextPerCall();
 
             return getWithRepairAsync(
                 fut,
                 () -> repairAsync(key, opCtx, false),
-                () -> repairableGetAsync(key, deserializeBinary, needVer, readRepair));
+                () -> repairableGetAsync(key, deserializeBinary, needVer, readRepairStrategy));

Review comment:
       such optimization will require refactoring when we need this info in addition to the knowledge that strategy is not null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776383253



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -699,9 +701,9 @@ public void onKernalStop() {
             /*skip values*/true,
             false);
 
-        boolean readRepair = opCtx != null && opCtx.readRepair();

Review comment:
       We need this to detect strategy inside checkOnly future, as we discussed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785886169



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteConsistencyViolationException.java
##########
@@ -17,20 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 
 /**
- * Possible consistency violation exception.
- * Each such case should be rechecked under locks.
+ * Consistency violation exception.
  */
 public class IgniteConsistencyViolationException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Inconsistent entries keys. */
+    private final Set<KeyCacheObject> keys;
+
+    /**
+     * @param keys Keys.
+     */
+    public IgniteConsistencyViolationException(Set<KeyCacheObject> keys) {
+        super("Distributed cache consistency violation detected.");
+
+        assert keys != null && !keys.isEmpty();
+
+        this.keys = keys;
+    }
+
     /**
-     * @param msg Message.
+     * Found but unrepaired (because of chosen strategy) inconsistent entries keys.

Review comment:
       A little bit misleading. Keys aren't repaired because Ignite hasn't tried to repair them. Ignite throws `IrreparableException` if it didn't repair keys because of strategy, don't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785892099



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteConsistencyViolationException.java
##########
@@ -17,20 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 
 /**
- * Possible consistency violation exception.
- * Each such case should be rechecked under locks.
+ * Consistency violation exception.
  */
 public class IgniteConsistencyViolationException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Inconsistent entries keys. */
+    private final Set<KeyCacheObject> keys;
+
+    /**
+     * @param keys Keys.
+     */
+    public IgniteConsistencyViolationException(Set<KeyCacheObject> keys) {
+        super("Distributed cache consistency violation detected.");
+
+        assert keys != null && !keys.isEmpty();
+
+        this.keys = keys;
+    }
+
     /**
-     * @param msg Message.
+     * Found but unrepaired (because of chosen strategy) inconsistent entries keys.

Review comment:
       Possible cases are 
   1) CHECK_ONLY - always adds all broken keys to this set.
   2) LWW and RELATIVE_MAJORITY, where keys can not be repaired.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r781121353



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4833,6 +4835,9 @@ private boolean clearLocally0(K key, boolean readers) {
                 deserializeBinary,
                 needVer);
         }
+        catch (IgniteIrreparableConsistencyViolationException e) {
+            throw e;
+        }
         catch (IgniteConsistencyViolationException e) {
             repairAsync(key, ctx.operationContextPerCall(), false).get();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775931266



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+
+/**
+ * Read repair strategies.

Review comment:
       yes, you mentioned that some strategies under some circumstances can throw Irreparable exception. Let's describe it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776682070



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {

Review comment:
       Simplified as discussed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov merged pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov merged pull request #9661:
URL: https://github.com/apache/ignite/pull/9661


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775962307



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -92,7 +93,7 @@ public CacheOperationContext(
         boolean noRetries,
         @Nullable Byte dataCenterId,
         boolean recovery,
-        boolean readRepair,
+        ReadRepairStrategy readRepairStrategy,

Review comment:
       There is already `@Nullable` in the signature. Let's keep it consistent




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775856942



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
##########
@@ -294,7 +294,7 @@ public GridCacheProxyImpl(
                         false,
                         null,
                         false,
-                        false,
+                        null,

Review comment:
       Let's refactor this at another issue if necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782909416



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
##########
@@ -232,7 +233,10 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withReadRepair() {
+    @Override public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy) {
+        if (strategy == null)

Review comment:
       Ok 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776764408



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteIrreparableConsistencyViolationException.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+
+/**
+ * Irreparable consistency violation exception.
+ */
+public class IgniteIrreparableConsistencyViolationException extends IgniteConsistencyViolationException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Irreparable keys. */
+    private final Collection<?> irreparableKeys;
+
+    /**
+     * @param keys            Keys.
+     * @param irreparableKeys Irreparable keys.
+     */
+    public IgniteIrreparableConsistencyViolationException(Collection<?> keys, Collection<?> irreparableKeys) {
+        super(keys);

Review comment:
       Separated the exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775855272



##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;
+
+    /** Fixed entries. */
+    final Map<?, ?> fixed;
 
     /** Cache name. */
     final String cacheName;
 
+    /** Strategy. */
+    final ReadRepairStrategy strategy;
+
     /**
      * Creates a new instance of CacheConsistencyViolationEvent.
-     *
-     * @param cacheName Cache name.
+     *  @param cacheName Cache name.
      * @param node Local node.
      * @param msg Event message.
      * @param entries Collection of original entries.
+     * @param fixed Collection of fixed entries.
+     * @param strategy
      */
     public CacheConsistencyViolationEvent(
         String cacheName,
         ClusterNode node,
         String msg,
-        Map<Object, Map<ClusterNode, EntryInfo>> entries) {
+        Map<?, Map<ClusterNode, EntryInfo>> entries,
+        Map<?, ?> fixed, ReadRepairStrategy strategy) {
         super(node, msg, EVT_CONSISTENCY_VIOLATION);
 
         this.cacheName = cacheName;
         this.entries = entries;
+        this.fixed = fixed;
+        this.strategy = strategy;
     }
 
     /**
      * Returns a mapping of keys to a collection of original entries.
      *
      * @return Collection of original entries.
      */
-    public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() {
+    public Map<?, Map<ClusterNode, EntryInfo>> getEntries() {

Review comment:
       Looks like "?" is not required in this class at all.
   Have no clue why it was required before, but this was made because of compilation issues.
   Thanks for pointing this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782913455



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -699,12 +701,12 @@ public void onKernalStop() {
             /*skip values*/true,
             false);
 
-        boolean readRepair = opCtx != null && opCtx.readRepair();
+        ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null;
 
-        if (readRepair)
+        if (readRepairStrategy != null)
             return getWithRepairAsync(
                 fut,
-                () -> repairAsync(key, opCtx, true),
+                (ks) -> repairAsync(ks, opCtx, true),

Review comment:
       1) This will increase code complexity. 
   The same way should be used everywhere.
   2) Now we use keys from `IgniteConsistencyViolationException` where keys are `KeyCacheObject's`, while `K key` can be any plain object (that's not a problem now, but can lead to problems in the future).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776396247



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {
+                    Collection<?> keys = ((IgniteConsistencyViolationException)e).keys();
+
+                    if (this.keys == null)
+                        this.keys = new GridConcurrentHashSet<>();
+
+                    this.keys.addAll(keys);
+
+                    if (e instanceof IgniteIrreparableConsistencyViolationException) {
+                        Collection<?> irreparableKeys = ((IgniteIrreparableConsistencyViolationException)e).irreparableKeys();
+
+                        if (this.irreparableKeys == null)
+                            this.irreparableKeys = new GridConcurrentHashSet<>();
+
+                        this.irreparableKeys.addAll(irreparableKeys);
+                    }
+                }
+            }
+            else
+                onDone(e);
+        }
+
+        LSNR_CALLS_UPD.incrementAndGet(this);
+
+        checkComplete();
+    }
+
+    /**
+     * Mark this future as initialized.
+     */
+    public final void markInitialized() {
+        if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG))

Review comment:
       Simplified!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775873764



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4833,6 +4835,9 @@ private boolean clearLocally0(K key, boolean readers) {
                 deserializeBinary,
                 needVer);
         }
+        catch (IgniteIrreparableConsistencyViolationException e) {
+            throw e;
+        }
         catch (IgniteConsistencyViolationException e) {
             repairAsync(key, ctx.operationContextPerCall(), false).get();

Review comment:
       IGNITE-16224




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775875202



##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -67,39 +68,58 @@
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    final Map<?, Map<ClusterNode, EntryInfo>> entries;

Review comment:
       Looks like "?" is not required in this class at all.
   Have no clue why it was required before, but this was made because of compilation issues.
   Thanks for pointing this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782903608



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -62,6 +66,23 @@
     /** Partitions. */
     private static final int PARTITIONS = 32;
 
+    /** */
+    @Parameterized.Parameters(name = "strategy={0}")
+    public static Iterable<Object[]> data() {
+        List<Object[]> res = new ArrayList<>();
+
+        for (ReadRepairStrategy strategy : ReadRepairStrategy.values())
+            res.add(new Object[] {strategy});
+
+        return res;

Review comment:
       Yes, but it will be hard to extend params number in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782985651



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialized. */
+    private volatile boolean inited;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;

Review comment:
       `irreparableKeys` are that type that was requested (depends on `keepBinary` flag).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785892099



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteConsistencyViolationException.java
##########
@@ -17,20 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 
 /**
- * Possible consistency violation exception.
- * Each such case should be rechecked under locks.
+ * Consistency violation exception.
  */
 public class IgniteConsistencyViolationException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Inconsistent entries keys. */
+    private final Set<KeyCacheObject> keys;
+
+    /**
+     * @param keys Keys.
+     */
+    public IgniteConsistencyViolationException(Set<KeyCacheObject> keys) {
+        super("Distributed cache consistency violation detected.");
+
+        assert keys != null && !keys.isEmpty();
+
+        this.keys = keys;
+    }
+
     /**
-     * @param msg Message.
+     * Found but unrepaired (because of chosen strategy) inconsistent entries keys.

Review comment:
       Possible cases are 
   1) CHECK_ONLY - always add all broken keys to this set.
   2) LWW and RELATIVE_MAJORITY, where some or all keys can not be repaired.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775896968



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+
+/**
+ * Read repair strategies.

Review comment:
       Difference listed at enum values javadocs.
   Performace will be tuned at IGNITE-15606
   Do we need to write something else?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775893488



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)

Review comment:
       This test does not check the strategies work.
   It checks `idle_verify` -> `repair -> `idle_verify`.
   `MAJORITY/LWW` may fix all entries, may fix some, may fix none, we only check they cause no issues when started from `control.sh`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776381171



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
##########
@@ -232,7 +233,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withReadRepair() {
+    @Override public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy) {

Review comment:
       Added exception!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775847641



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -92,7 +93,7 @@ public CacheOperationContext(
         boolean noRetries,
         @Nullable Byte dataCenterId,
         boolean recovery,
-        boolean readRepair,
+        ReadRepairStrategy readRepairStrategy,

Review comment:
       Everything is nullable except primitives :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782904902



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -122,13 +143,17 @@ private void testAtomicAndTx(boolean incVal) throws Exception {
         assertContains(log, testOut.toString(),
             "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get());
 
-        readRepairTx(brokenParts, txCacheName);
+        Integer fixesPerEntry = fixesPerEntry();
 
-        assertEquals(PARTITIONS, brokenParts.get()); // Half fixed.
+        readRepair(brokenParts, txCacheName, fixesPerEntry);

Review comment:
       The test will be simplified at IGNITE-15329




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776378525



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
##########
@@ -191,7 +192,7 @@ public GridDistributedCacheEntry entryExx(
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean recovery = opCtx != null && opCtx.recovery();
-        final boolean readRepair = opCtx != null && opCtx.readRepair();
+        final ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null;

Review comment:
       We use readRepairStrategy as a param at `GridNearTxLocal#getAllAsync`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r790860167



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
                         else if (compareRes < 0)
                             fixedMap.put(key, newestRes);
                         else if (compareRes == 0) {
-                            CacheObjectAdapter candidateVal = candidateRes.value();
-                            CacheObjectAdapter newestVal = newestRes.value();
-
-                            try {
-                                byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
-                                byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+                            CacheObject candidateVal = candidateRes.value();
+                            CacheObject newestVal = newestRes.value();
 
-                                if (!Arrays.equals(candidateBytes, newestBytes))
-                                    fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                            byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
 
-                                return;
-                            }
+                            if (!Arrays.equals(candidateBytes, newestBytes))
+                                irreparableSet.add(key);
                         }
                     }
                 }
                 else if (newestRes != null)
-                    fixedMap.put(key, newestRes); // Existing data wins.
+                    irreparableSet.add(key); // Impossible to detect latest between existing and null.
             }
         }
 
         assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
 
-        if (!fixedMap.isEmpty()) {
-            tx.finishFuture().listen(future -> {
-                TransactionState state = tx.state();
+        if (!irreparableSet.isEmpty())
+            throw new IgniteConsistencyViolationException(irreparableSet);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                if (!inconsistentKeys.contains(key) ||
+                    !primaries.get(key).equals(fut.affNode()))
+                    continue;
+
+                fixedMap.put(key, fut.result().get(key));
+            }
+        }
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject key : inconsistentKeys)
+            fixedMap.put(key, null);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+        throws IgniteCheckedException {
+        /** */
+        class ByteArrayWrapper {
+            final byte[] arr;
+
+            /** */
+            public ByteArrayWrapper(byte[] arr) {
+                this.arr = arr;
+            }
+
+            /** */
+            @Override public boolean equals(Object o) {
+                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+            }
 
-                if (state == TransactionState.COMMITTED) // Explicit tx may fix the values but become rolled back later.
-                    recordConsistencyViolation(fixedMap.keySet(), fixedMap);
-            });
+            /** */
+            @Override public int hashCode() {
+                return Arrays.hashCode(arr);
+            }
         }
 
-        onDone(fixedMap);
+        Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+            Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult, Integer>> cntMap = new HashMap<>();
+
+            for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+                if (!fut.keys().contains(inconsistentKey))
+                    continue;

Review comment:
       Not every mapped node is an owner for every checked key. 
   We have a greedy mapping (all nodes own at least one key from the checked list), so, it's ok that the node has no entry for each key, this just means it's not an owner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r790865952



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
                         else if (compareRes < 0)
                             fixedMap.put(key, newestRes);
                         else if (compareRes == 0) {
-                            CacheObjectAdapter candidateVal = candidateRes.value();
-                            CacheObjectAdapter newestVal = newestRes.value();
-
-                            try {
-                                byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
-                                byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+                            CacheObject candidateVal = candidateRes.value();
+                            CacheObject newestVal = newestRes.value();
 
-                                if (!Arrays.equals(candidateBytes, newestBytes))
-                                    fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                            byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
 
-                                return;
-                            }
+                            if (!Arrays.equals(candidateBytes, newestBytes))
+                                irreparableSet.add(key);
                         }
                     }
                 }
                 else if (newestRes != null)
-                    fixedMap.put(key, newestRes); // Existing data wins.
+                    irreparableSet.add(key); // Impossible to detect latest between existing and null.
             }
         }
 
         assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
 
-        if (!fixedMap.isEmpty()) {
-            tx.finishFuture().listen(future -> {
-                TransactionState state = tx.state();
+        if (!irreparableSet.isEmpty())
+            throw new IgniteConsistencyViolationException(irreparableSet);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                if (!inconsistentKeys.contains(key) ||
+                    !primaries.get(key).equals(fut.affNode()))
+                    continue;
+
+                fixedMap.put(key, fut.result().get(key));
+            }
+        }
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject key : inconsistentKeys)
+            fixedMap.put(key, null);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+        throws IgniteCheckedException {
+        /** */
+        class ByteArrayWrapper {
+            final byte[] arr;
+
+            /** */
+            public ByteArrayWrapper(byte[] arr) {
+                this.arr = arr;
+            }
+
+            /** */
+            @Override public boolean equals(Object o) {
+                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+            }
 
-                if (state == TransactionState.COMMITTED) // Explicit tx may fix the values but become rolled back later.
-                    recordConsistencyViolation(fixedMap.keySet(), fixedMap);
-            });
+            /** */
+            @Override public int hashCode() {
+                return Arrays.hashCode(arr);
+            }
         }
 
-        onDone(fixedMap);
+        Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+            Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult, Integer>> cntMap = new HashMap<>();
+
+            for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+                if (!fut.keys().contains(inconsistentKey))
+                    continue;
+
+                EntryGetResult res = fut.result().get(inconsistentKey);
+
+                ByteArrayWrapper wrapped;
+                GridCacheVersion ver;
+
+                if (res != null) {
+                    CacheObject val = res.value();
+
+                    wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+                    ver = res.version();
+                }
+                else {
+                    wrapped = new ByteArrayWrapper(null);
+                    ver = null;
+                }
+
+                T2<ByteArrayWrapper, GridCacheVersion> keyVer = new T2<>(wrapped, ver);
+
+                cntMap.putIfAbsent(keyVer, new T2<>(res, 0));
+
+                cntMap.compute(keyVer, (kv, ri) -> new T2<>(ri.getKey(), ri.getValue() + 1));
+            }
+
+            int[] sorted = cntMap.values().stream()
+                .map(IgniteBiTuple::getValue)
+                .sorted(Comparator.reverseOrder())
+                .mapToInt(v -> v)
+                .toArray();
+
+            int max = sorted[0];

Review comment:
       Null is also a value :) 
   Seems we checked this case at `AbstractFullSetReadRepairTest#testGetNull`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782103156



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -62,6 +66,23 @@
     /** Partitions. */
     private static final int PARTITIONS = 32;
 
+    /** */
+    @Parameterized.Parameters(name = "strategy={0}")
+    public static Iterable<Object[]> data() {
+        List<Object[]> res = new ArrayList<>();
+
+        for (ReadRepairStrategy strategy : ReadRepairStrategy.values())
+            res.add(new Object[] {strategy});
+
+        return res;

Review comment:
       Method can be replaced with 
   
   ```
       public static Object[] data() {
           return ReadRepairStrategy.values();
       }
   ```

##########
File path: docs/_docs/tools/control-script.adoc
##########
@@ -1037,21 +1038,22 @@ The command allows to perform cache consistency check and repair (when possible)
 tab:Unix[]
 [source,shell]
 ----
-control.sh --enable-experimental --consistency repair cache-name partition
+control.sh --enable-experimental --consistency repair cache-name partition strategy
 ----
 tab:Window[]
 [source,shell]
 ----
-control.bat --enable-experimental --consistency repair cache-name partition
+control.bat --enable-experimental --consistency repair cache-name partition strategy
 ----
 --
 Parameters:
 
 [cols="1,3",opts="header"]
 |===
 | Parameter | Description
-| `cache-name`| Cache to be checked/repaired..
+| `cache-name`| Cache to be checked/repaired.
 | `partition`| Cache's partition to be checked/repaired.
+| `strategy`| Repair strategy [LWW - default, PRIMARY, MAJORITY, REMOVE, CHECK_ONLY], optional.

Review comment:
       Remove mention of `optional` here.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
##########
@@ -232,7 +233,10 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withReadRepair() {
+    @Override public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy) {
+        if (strategy == null)

Review comment:
       Use `A.notNull()` instead.

##########
File path: modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
##########
@@ -109,6 +130,15 @@ public String getCacheName() {
         return cacheName;
     }
 
+    /**
+     * Resurns strategy.

Review comment:
       s/Resurns/Returns/

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialized. */
+    private volatile boolean inited;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;

Review comment:
       Let's make it KeyCacheObject too?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -254,51 +275,114 @@ else if (!canRemap)
      */
     protected abstract void reduce();
 
+    /**
+     *
+     */
+    protected Map<KeyCacheObject, EntryGetResult> check() throws IgniteCheckedException {
+        Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>(keys.size());
+        Set<KeyCacheObject> inconsistentKeys = new HashSet<>();
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                EntryGetResult curRes = fut.result().get(key);
+
+                if (!resMap.containsKey(key)) {
+                    resMap.put(key, curRes);
+
+                    continue;
+                }
+
+                EntryGetResult prevRes = resMap.get(key);
+
+                if (curRes != null) {
+                    if (prevRes == null || prevRes.version().compareTo(curRes.version()) != 0)
+                        inconsistentKeys.add(key);
+                    else {
+                        CacheObjectAdapter curVal = curRes.value();
+                        CacheObjectAdapter prevVal = prevRes.value();
+
+                        byte[] curBytes = curVal.valueBytes(ctx.cacheObjectContext());
+                        byte[] prevBytes = prevVal.valueBytes(ctx.cacheObjectContext());
+
+                        if (!Arrays.equals(curBytes, prevBytes))
+                            inconsistentKeys.add(key);
+
+                    }
+                }
+                else if (prevRes != null)
+                    inconsistentKeys.add(key);
+            }
+        }
+
+        if (!inconsistentKeys.isEmpty())
+            throw new IgniteConsistencyViolationException(inconsistentKeys);
+
+        return resMap;
+    }
+
     /**
      * @param fixedEntries Fixed map.
      */
     protected void recordConsistencyViolation(
-        Set<KeyCacheObject> inconsistentKeys,
-        Map<KeyCacheObject, EntryGetResult> fixedEntries
+        Collection<KeyCacheObject> inconsistentKeys,
+        Map<KeyCacheObject, EntryGetResult> fixedEntries,
+        ReadRepairStrategy strategy
     ) {
         GridEventStorageManager evtMgr = ctx.gridEvents();
 
         if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION))
             return;
 
-        Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> originalMap = new HashMap<>();
+        Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>();
 
         for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) {
             ClusterNode node = pair.getKey();
 
             GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = pair.getValue();
 
-            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
-                KeyCacheObject key = entry.getKey();
-
+            for (KeyCacheObject key : fut.keys()) {
                 if (inconsistentKeys.contains(key)) {
-                    EntryGetResult res = entry.getValue();
-                    CacheEntryVersion ver = res.version();
+                    Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
+                        entries.computeIfAbsent(
+                            ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>());
 
-                    Object val = ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null);
+                    EntryGetResult res = fut.result().get(key);
+                    CacheEntryVersion ver = res != null ? res.version() : null;
 
-                    Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
-                        originalMap.computeIfAbsent(
-                            ctx.unwrapBinaryIfNeeded(key, false, false, null), k -> new HashMap<>());
+                    Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null;
 
                     boolean primary = primaries.get(key).equals(fut.affNode());
-                    boolean correct = fixedEntries != null && fixedEntries.get(key).equals(res);
+                    boolean correct = fixedEntries != null &&
+                        ((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) ||
+                            (fixedEntries.get(key) == null && res == null));
 
                     map.put(node, new EventEntryInfo(val, ver, primary, correct));
                 }
             }
         }
 
+        Map<Object, Object> fixed;
+
+        if (fixedEntries == null)
+            fixed = Collections.emptyMap();
+        else {
+            fixed = new HashMap<>();
+
+            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) {
+                Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null);
+                Object val = entry.getValue() != null ?
+                    ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null;
+
+                fixed.put(key, val);
+            }
+        }
+
         evtMgr.record(new CacheConsistencyViolationEvent(
             ctx.name(),
             ctx.discovery().localNode(),
             "Consistency violation fixed.",

Review comment:
       It fixed, but `fixed` can be empty? Should we change message here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -128,17 +138,26 @@ protected GridNearReadRepairAbstractFuture(
         this.expiryPlc = expiryPlc;
         this.tx = tx;
 
+        assert strategy != null;
+
+        this.strategy = strategy;
+
         canRemap = topVer == null;
 
-        map(canRemap ? ctx.affinity().affinityTopologyVersion() : topVer);
+        this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : topVer;
     }
 
     /**
-     * @param topVer Affinity topology version.
+     *
      */
-    protected synchronized void map(AffinityTopologyVersion topVer) {
-        this.topVer = topVer;
+    protected void init() {
+        map();
+    }
 
+    /**
+     *
+     */
+    protected synchronized void map() {

Review comment:
       What do actually those methods synchronize? 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -5115,7 +5118,7 @@ protected V get(
                 ctx.operationContextPerCall(opCtx);
 
                 try (Transaction tx = ctx.grid().transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
-                    get(key); // Repair.
+                    get((K)key); // Repair.

Review comment:
       Should we invoke directly `repairableGet` here? Looks like `get` make some additional actions we actually don't need here - writing statistics, invokes interceptors, unwrap to KeyCacheObject again. WDYT?

##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -122,13 +143,17 @@ private void testAtomicAndTx(boolean incVal) throws Exception {
         assertContains(log, testOut.toString(),
             "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get());
 
-        readRepairTx(brokenParts, txCacheName);
+        Integer fixesPerEntry = fixesPerEntry();
 
-        assertEquals(PARTITIONS, brokenParts.get()); // Half fixed.
+        readRepair(brokenParts, txCacheName, fixesPerEntry);

Review comment:
       Let's separate test for atomic and for transactional caches. It's actually hard to read the test, I firstly though that `brokenParts  == 64` because of `backups == 2` and not because there are 2 caches. Then I can't understand why it fixes only half of conflicts for tx cache. There is no need to test both caches in single test, let's add a test param instead.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -254,51 +275,114 @@ else if (!canRemap)
      */
     protected abstract void reduce();
 
+    /**
+     *
+     */
+    protected Map<KeyCacheObject, EntryGetResult> check() throws IgniteCheckedException {

Review comment:
       Please, mark all protected methods with `final`, also please add some javadocs here, at least description of returning value. This method is important and it should be marked as important somehow, because CHECK_ONLY strategy relies on this method only. 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -699,12 +701,12 @@ public void onKernalStop() {
             /*skip values*/true,
             false);
 
-        boolean readRepair = opCtx != null && opCtx.readRepair();
+        ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null;
 
-        if (readRepair)
+        if (readRepairStrategy != null)
             return getWithRepairAsync(
                 fut,
-                () -> repairAsync(key, opCtx, true),
+                (ks) -> repairAsync(ks, opCtx, true),

Review comment:
       Actually we already knows the key to repair to. Because the original method accepts single key. Can we leverage on that here? 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -128,17 +138,26 @@ protected GridNearReadRepairAbstractFuture(
         this.expiryPlc = expiryPlc;
         this.tx = tx;
 
+        assert strategy != null;
+
+        this.strategy = strategy;
+
         canRemap = topVer == null;
 
-        map(canRemap ? ctx.affinity().affinityTopologyVersion() : topVer);
+        this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : topVer;
     }
 
     /**
-     * @param topVer Affinity topology version.
+     *
      */
-    protected synchronized void map(AffinityTopologyVersion topVer) {
-        this.topVer = topVer;
+    protected void init() {
+        map();
+    }
 
+    /**
+     *
+     */
+    protected synchronized void map() {

Review comment:
       Should be `private`, also as the `onResult` method.

##########
File path: docs/_docs/tools/control-script.adoc
##########
@@ -1037,21 +1038,22 @@ The command allows to perform cache consistency check and repair (when possible)
 tab:Unix[]
 [source,shell]
 ----
-control.sh --enable-experimental --consistency repair cache-name partition
+control.sh --enable-experimental --consistency repair cache-name partition strategy
 ----
 tab:Window[]
 [source,shell]
 ----
-control.bat --enable-experimental --consistency repair cache-name partition
+control.bat --enable-experimental --consistency repair cache-name partition strategy
 ----
 --
 Parameters:
 
 [cols="1,3",opts="header"]
 |===
 | Parameter | Description
-| `cache-name`| Cache to be checked/repaired..
+| `cache-name`| Cache to be checked/repaired.
 | `partition`| Cache's partition to be checked/repaired.
+| `strategy`| Repair strategy [LWW - default, PRIMARY, MAJORITY, REMOVE, CHECK_ONLY], optional.

Review comment:
       RELATIVE_MAJOIRTY?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -254,51 +275,114 @@ else if (!canRemap)
      */
     protected abstract void reduce();
 
+    /**
+     *
+     */
+    protected Map<KeyCacheObject, EntryGetResult> check() throws IgniteCheckedException {
+        Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>(keys.size());
+        Set<KeyCacheObject> inconsistentKeys = new HashSet<>();
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                EntryGetResult curRes = fut.result().get(key);
+
+                if (!resMap.containsKey(key)) {
+                    resMap.put(key, curRes);
+
+                    continue;
+                }
+
+                EntryGetResult prevRes = resMap.get(key);
+
+                if (curRes != null) {
+                    if (prevRes == null || prevRes.version().compareTo(curRes.version()) != 0)
+                        inconsistentKeys.add(key);
+                    else {
+                        CacheObjectAdapter curVal = curRes.value();
+                        CacheObjectAdapter prevVal = prevRes.value();
+
+                        byte[] curBytes = curVal.valueBytes(ctx.cacheObjectContext());
+                        byte[] prevBytes = prevVal.valueBytes(ctx.cacheObjectContext());
+
+                        if (!Arrays.equals(curBytes, prevBytes))
+                            inconsistentKeys.add(key);
+
+                    }
+                }
+                else if (prevRes != null)
+                    inconsistentKeys.add(key);
+            }
+        }
+
+        if (!inconsistentKeys.isEmpty())
+            throw new IgniteConsistencyViolationException(inconsistentKeys);
+
+        return resMap;
+    }
+
     /**
      * @param fixedEntries Fixed map.
      */
     protected void recordConsistencyViolation(
-        Set<KeyCacheObject> inconsistentKeys,
-        Map<KeyCacheObject, EntryGetResult> fixedEntries
+        Collection<KeyCacheObject> inconsistentKeys,
+        Map<KeyCacheObject, EntryGetResult> fixedEntries,
+        ReadRepairStrategy strategy
     ) {
         GridEventStorageManager evtMgr = ctx.gridEvents();
 
         if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION))
             return;
 
-        Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> originalMap = new HashMap<>();
+        Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>();
 
         for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) {
             ClusterNode node = pair.getKey();
 
             GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = pair.getValue();
 
-            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
-                KeyCacheObject key = entry.getKey();
-
+            for (KeyCacheObject key : fut.keys()) {
                 if (inconsistentKeys.contains(key)) {
-                    EntryGetResult res = entry.getValue();
-                    CacheEntryVersion ver = res.version();
+                    Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
+                        entries.computeIfAbsent(
+                            ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>());
 
-                    Object val = ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null);
+                    EntryGetResult res = fut.result().get(key);
+                    CacheEntryVersion ver = res != null ? res.version() : null;
 
-                    Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
-                        originalMap.computeIfAbsent(
-                            ctx.unwrapBinaryIfNeeded(key, false, false, null), k -> new HashMap<>());
+                    Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null;
 
                     boolean primary = primaries.get(key).equals(fut.affNode());
-                    boolean correct = fixedEntries != null && fixedEntries.get(key).equals(res);
+                    boolean correct = fixedEntries != null &&
+                        ((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) ||
+                            (fixedEntries.get(key) == null && res == null));
 
                     map.put(node, new EventEntryInfo(val, ver, primary, correct));
                 }
             }
         }
 
+        Map<Object, Object> fixed;
+
+        if (fixedEntries == null)
+            fixed = Collections.emptyMap();
+        else {
+            fixed = new HashMap<>();
+
+            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) {
+                Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null);
+                Object val = entry.getValue() != null ?
+                    ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null;
+
+                fixed.put(key, val);
+            }
+        }
+
         evtMgr.record(new CacheConsistencyViolationEvent(
             ctx.name(),
             ctx.discovery().localNode(),
             "Consistency violation fixed.",
-            originalMap));
+            entries,
+            fixed, strategy));

Review comment:
       Move strategy on new line.

##########
File path: docs/_docs/tools/control-script.adoc
##########
@@ -1037,21 +1038,22 @@ The command allows to perform cache consistency check and repair (when possible)
 tab:Unix[]
 [source,shell]
 ----
-control.sh --enable-experimental --consistency repair cache-name partition
+control.sh --enable-experimental --consistency repair cache-name partition strategy
 ----
 tab:Window[]
 [source,shell]
 ----
-control.bat --enable-experimental --consistency repair cache-name partition
+control.bat --enable-experimental --consistency repair cache-name partition strategy
 ----
 --
 Parameters:
 
 [cols="1,3",opts="header"]
 |===
 | Parameter | Description
-| `cache-name`| Cache to be checked/repaired..
+| `cache-name`| Cache to be checked/repaired.
 | `partition`| Cache's partition to be checked/repaired.
+| `strategy`| Repair strategy [LWW - default, PRIMARY, MAJORITY, REMOVE, CHECK_ONLY], optional.

Review comment:
       Remove `default` too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r782903608



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -62,6 +66,23 @@
     /** Partitions. */
     private static final int PARTITIONS = 32;
 
+    /** */
+    @Parameterized.Parameters(name = "strategy={0}")
+    public static Iterable<Object[]> data() {
+        List<Object[]> res = new ArrayList<>();
+
+        for (ReadRepairStrategy strategy : ReadRepairStrategy.values())
+            res.add(new Object[] {strategy});
+
+        return res;

Review comment:
       Yes, but it will be hard to extend params in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776385930



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Read repair strategies.
+ *
+ * @see IgniteCache#withReadRepair(ReadRepairStrategy) for details.
+ */
+public enum ReadRepairStrategy {
+    /** Last write (the newest entry) wins.
+     * <p>
+     * May cause {@link IgniteException} when fix is impossible (unable to detect the newest entry):
+     * <ul>
+     * <li>Null(s) found as well as non-null values for the save key.
+     * <p>
+     * Null (missed entry) has no version, so, it can not be compared with the versioned entry.</li>
+     * <li>Entries with the same version have different values.</li>
+     * </ul>
+     */
+    LWW("LWW"),
+
+    /** Value from the primary node wins. */
+    PRIMARY("PRIMARY"),
+
+    /** The relative majority, any value found more times than any other wins.
+     * <p>
+     * Works for an even number of copies (which is typical of Ignite) instead of an absolute majority.
+     * <p>
+     * May cause {@link IgniteException} when unable to detect value found more times than any other.
+     * <p>
+     * For example, when we have 5 copies and value `A` found twice, but `X`,`Y` and `Z` only once, `A` wins.

Review comment:
       1 primary and 4 backups




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775928454



##########
File path: modules/core/src/main/java/org/apache/ignite/IgniteCache.java
##########
@@ -188,6 +189,19 @@
      * <li>{@link IgniteCache#get} && {@link IgniteCache#getAsync}</li>
      * <li>{@link IgniteCache#getAll} && {@link IgniteCache#getAllAsync}</li>
      * </ul>
+     * @param strategy Read Repair strategy.
+     * @return Cache with explicit consistency check on each read and repair if necessary.
+     */
+    @IgniteExperimental
+    public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy);

Review comment:
       removed `withReadRepair()` as well as `ReadRepairStrategy.defaultStrategy()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r775843804



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
##########
@@ -187,36 +215,48 @@ public void testRepairNonExistentCache() throws Exception {
     /**
      *
      */
-    private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
+    private void readRepair(AtomicInteger brokenParts, String cacheName, Integer fixesPerEntry) {
         for (int i = 0; i < PARTITIONS; i++) {
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
+            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i), strategy.toString()));
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1");
+            assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
-            brokenParts.decrementAndGet();
+            if (fixesPerEntry != null)
+                if (fixesPerEntry > 0) {
+                    brokenParts.decrementAndGet();
 
-            if (brokenParts.get() > 0)
-                assertContains(log, testOut.toString(),
-                    "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
-            else
-                assertContains(log, testOut.toString(), "no conflicts have been found");
+                    if (brokenParts.get() > 0)
+                        assertContains(log, testOut.toString(),
+                            "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts);
+                    else
+                        assertContains(log, testOut.toString(), "no conflicts have been found");
+                }
+                else
+                    assertContains(log, testOut.toString(),
+                        "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
         }
     }
 
     /**
      *
      */
-    private void readRepaitAtomic(AtomicInteger brokenParts, String cacheName) {
-        for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of previous (tx case), implement atomic repair to make this happen :)
-            assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i)));
-            assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=0"); // Nothing fixed.
+    private Integer fixesPerEntry() {
+        switch (strategy) {
+            case PRIMARY:
+            case REMOVE:
+                return 1;
 
-            assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
-            assertContains(log, testOut.toString(),
-                "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts); // Nothing fixed.
+            case CHECK_ONLY:
+                return 0;
+
+            case MAJORITY:
+            case LWW:
+                return null; // Who knows :)

Review comment:
       We have another test to do this.
   See `AbstractReadRepairTest#setDifferentValuesForSameKey` for details




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785913259



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
                         else if (compareRes < 0)
                             fixedMap.put(key, newestRes);
                         else if (compareRes == 0) {
-                            CacheObjectAdapter candidateVal = candidateRes.value();
-                            CacheObjectAdapter newestVal = newestRes.value();
-
-                            try {
-                                byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
-                                byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+                            CacheObject candidateVal = candidateRes.value();
+                            CacheObject newestVal = newestRes.value();
 
-                                if (!Arrays.equals(candidateBytes, newestBytes))
-                                    fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                            byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
 
-                                return;
-                            }
+                            if (!Arrays.equals(candidateBytes, newestBytes))
+                                irreparableSet.add(key);
                         }
                     }
                 }
                 else if (newestRes != null)
-                    fixedMap.put(key, newestRes); // Existing data wins.
+                    irreparableSet.add(key); // Impossible to detect latest between existing and null.
             }
         }
 
         assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
 
-        if (!fixedMap.isEmpty()) {
-            tx.finishFuture().listen(future -> {
-                TransactionState state = tx.state();
+        if (!irreparableSet.isEmpty())
+            throw new IgniteConsistencyViolationException(irreparableSet);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                if (!inconsistentKeys.contains(key) ||
+                    !primaries.get(key).equals(fut.affNode()))
+                    continue;
+
+                fixedMap.put(key, fut.result().get(key));
+            }
+        }
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject key : inconsistentKeys)
+            fixedMap.put(key, null);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+        throws IgniteCheckedException {
+        /** */
+        class ByteArrayWrapper {
+            final byte[] arr;
+
+            /** */
+            public ByteArrayWrapper(byte[] arr) {
+                this.arr = arr;
+            }
+
+            /** */
+            @Override public boolean equals(Object o) {
+                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+            }
 
-                if (state == TransactionState.COMMITTED) // Explicit tx may fix the values but become rolled back later.
-                    recordConsistencyViolation(fixedMap.keySet(), fixedMap);
-            });
+            /** */
+            @Override public int hashCode() {
+                return Arrays.hashCode(arr);
+            }
         }
 
-        onDone(fixedMap);
+        Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+            Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult, Integer>> cntMap = new HashMap<>();
+
+            for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+                if (!fut.keys().contains(inconsistentKey))
+                    continue;
+
+                EntryGetResult res = fut.result().get(inconsistentKey);
+
+                ByteArrayWrapper wrapped;
+                GridCacheVersion ver;
+
+                if (res != null) {
+                    CacheObject val = res.value();
+
+                    wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+                    ver = res.version();
+                }
+                else {
+                    wrapped = new ByteArrayWrapper(null);
+                    ver = null;
+                }
+
+                T2<ByteArrayWrapper, GridCacheVersion> keyVer = new T2<>(wrapped, ver);
+
+                cntMap.putIfAbsent(keyVer, new T2<>(res, 0));
+
+                cntMap.compute(keyVer, (kv, ri) -> new T2<>(ri.getKey(), ri.getValue() + 1));
+            }
+
+            int[] sorted = cntMap.values().stream()
+                .map(IgniteBiTuple::getValue)
+                .sorted(Comparator.reverseOrder())
+                .mapToInt(v -> v)
+                .toArray();
+
+            int max = sorted[0];

Review comment:
       If all nodes didn't send response for the inconsistent key, we will fail here with unbound exception. Is it possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785892099



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteConsistencyViolationException.java
##########
@@ -17,20 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 
 /**
- * Possible consistency violation exception.
- * Each such case should be rechecked under locks.
+ * Consistency violation exception.
  */
 public class IgniteConsistencyViolationException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Inconsistent entries keys. */
+    private final Set<KeyCacheObject> keys;
+
+    /**
+     * @param keys Keys.
+     */
+    public IgniteConsistencyViolationException(Set<KeyCacheObject> keys) {
+        super("Distributed cache consistency violation detected.");
+
+        assert keys != null && !keys.isEmpty();
+
+        this.keys = keys;
+    }
+
     /**
-     * @param msg Message.
+     * Found but unrepaired (because of chosen strategy) inconsistent entries keys.

Review comment:
       Possible cases are 
   1) CHECK_ONLY - always adds all broken keys to this set.
   2) LWW and RELATIVE_MAJORITY, where some or all keys can not be repaired.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785906082



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
                         else if (compareRes < 0)
                             fixedMap.put(key, newestRes);
                         else if (compareRes == 0) {
-                            CacheObjectAdapter candidateVal = candidateRes.value();
-                            CacheObjectAdapter newestVal = newestRes.value();
-
-                            try {
-                                byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
-                                byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+                            CacheObject candidateVal = candidateRes.value();
+                            CacheObject newestVal = newestRes.value();
 
-                                if (!Arrays.equals(candidateBytes, newestBytes))
-                                    fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                            byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
 
-                                return;
-                            }
+                            if (!Arrays.equals(candidateBytes, newestBytes))
+                                irreparableSet.add(key);
                         }
                     }
                 }
                 else if (newestRes != null)
-                    fixedMap.put(key, newestRes); // Existing data wins.
+                    irreparableSet.add(key); // Impossible to detect latest between existing and null.
             }
         }
 
         assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
 
-        if (!fixedMap.isEmpty()) {
-            tx.finishFuture().listen(future -> {
-                TransactionState state = tx.state();
+        if (!irreparableSet.isEmpty())
+            throw new IgniteConsistencyViolationException(irreparableSet);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                if (!inconsistentKeys.contains(key) ||
+                    !primaries.get(key).equals(fut.affNode()))
+                    continue;
+
+                fixedMap.put(key, fut.result().get(key));
+            }
+        }
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject key : inconsistentKeys)
+            fixedMap.put(key, null);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+        throws IgniteCheckedException {
+        /** */
+        class ByteArrayWrapper {
+            final byte[] arr;
+
+            /** */
+            public ByteArrayWrapper(byte[] arr) {
+                this.arr = arr;
+            }
+
+            /** */
+            @Override public boolean equals(Object o) {
+                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+            }
 
-                if (state == TransactionState.COMMITTED) // Explicit tx may fix the values but become rolled back later.
-                    recordConsistencyViolation(fixedMap.keySet(), fixedMap);
-            });
+            /** */
+            @Override public int hashCode() {
+                return Arrays.hashCode(arr);
+            }
         }
 
-        onDone(fixedMap);
+        Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+            Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult, Integer>> cntMap = new HashMap<>();
+
+            for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
+                if (!fut.keys().contains(inconsistentKey))
+                    continue;

Review comment:
       Should we add a WARN message here, that we skips info from one of backups? What if a correct solution is remove element by RELATIVE_MAJORITY (multiple nodes doesn't provide result for get())?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] anton-vinogradov commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
anton-vinogradov commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r783002497



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -128,17 +138,26 @@ protected GridNearReadRepairAbstractFuture(
         this.expiryPlc = expiryPlc;
         this.tx = tx;
 
+        assert strategy != null;
+
+        this.strategy = strategy;
+
         canRemap = topVer == null;
 
-        map(canRemap ? ctx.affinity().affinityTopologyVersion() : topVer);
+        this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : topVer;
     }
 
     /**
-     * @param topVer Affinity topology version.
+     *
      */
-    protected synchronized void map(AffinityTopologyVersion topVer) {
-        this.topVer = topVer;
+    protected void init() {
+        map();
+    }
 
+    /**
+     *
+     */
+    protected synchronized void map() {

Review comment:
       `onResult` and `map` should be synced because can be run in an async way from different threads.
   This going to be simplified/refactored at IGNITE-16071.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776203084



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -18,23 +18,43 @@
 package org.apache.ignite.cache;
 
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 
 /**
  * Read repair strategies.
  *
  * @see IgniteCache#withReadRepair(ReadRepairStrategy) for details.
  */
 public enum ReadRepairStrategy {
-    /** Last write wins. */
+    /** Last write (the newest entry) wins.
+     * <p>
+     * May cause {@link IgniteException} when fix is impossible (unable to detect the newest entry):
+     * <ul>
+     * <li>Null(s) found as well as non-null values for the save key.

Review comment:
       s/save/same/

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
##########
@@ -232,7 +233,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withReadRepair() {
+    @Override public IgniteCache<K, V> withReadRepair(ReadRepairStrategy strategy) {

Review comment:
       Let's add assert that the strategy isn't `null` here.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteIrreparableConsistencyViolationException.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+
+/**
+ * Irreparable consistency violation exception.
+ */
+public class IgniteIrreparableConsistencyViolationException extends IgniteConsistencyViolationException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Irreparable keys. */
+    private final Collection<?> irreparableKeys;

Review comment:
       Can be replaced with `Collection<Object>`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
##########
@@ -92,7 +93,7 @@ public CacheOperationContext(
         boolean noRetries,
         @Nullable Byte dataCenterId,
         boolean recovery,
-        boolean readRepair,
+        ReadRepairStrategy readRepairStrategy,

Review comment:
       There is already `@Nullable` in the signature. Let's keep it consistent

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
##########
@@ -191,7 +192,7 @@ public GridDistributedCacheEntry entryExx(
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean recovery = opCtx != null && opCtx.recovery();
-        final boolean readRepair = opCtx != null && opCtx.readRepair();
+        final ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null;

Review comment:
       Can replace it with boolean flag for better readability. It looks strange that we extract a strategy here, but don't use it and run only consistency check and not repair.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4896,13 +4901,13 @@ protected V get(
             /*skip vals*/false,
             needVer);
 
-        if (readRepair) {
+        if (readRepairStrategy != null) {
             CacheOperationContext opCtx = ctx.operationContextPerCall();
 
             return getWithRepairAsync(
                 fut,
                 () -> repairAsync(key, opCtx, false),
-                () -> repairableGetAsync(key, deserializeBinary, needVer, readRepair));
+                () -> repairableGetAsync(key, deserializeBinary, needVer, readRepairStrategy));

Review comment:
       It's ok to have the boolean flag here too. Let's use `strategy` only in the place actually performs repair.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {

Review comment:
       Can replace with the DCL pattern to synchronize on initialization of collections only.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/ReadRepairStrategy.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Read repair strategies.
+ *
+ * @see IgniteCache#withReadRepair(ReadRepairStrategy) for details.
+ */
+public enum ReadRepairStrategy {
+    /** Last write (the newest entry) wins.
+     * <p>
+     * May cause {@link IgniteException} when fix is impossible (unable to detect the newest entry):
+     * <ul>
+     * <li>Null(s) found as well as non-null values for the save key.
+     * <p>
+     * Null (missed entry) has no version, so, it can not be compared with the versioned entry.</li>
+     * <li>Entries with the same version have different values.</li>
+     * </ul>
+     */
+    LWW("LWW"),
+
+    /** Value from the primary node wins. */
+    PRIMARY("PRIMARY"),
+
+    /** The relative majority, any value found more times than any other wins.
+     * <p>
+     * Works for an even number of copies (which is typical of Ignite) instead of an absolute majority.
+     * <p>
+     * May cause {@link IgniteException} when unable to detect value found more times than any other.
+     * <p>
+     * For example, when we have 5 copies and value `A` found twice, but `X`,`Y` and `Z` only once, `A` wins.

Review comment:
       But it's said 
   
   > works for an even number of copies 
   
   What does it mean in case of 5 copies?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -699,9 +701,9 @@ public void onKernalStop() {
             /*skip values*/true,
             false);
 
-        boolean readRepair = opCtx != null && opCtx.readRepair();

Review comment:
       Here and below. I like this boolean here. It's a little bit straightforward from my point of view.
   
   `boolean readRepair = opCtx != null && opCtx.readRepairStrategy() != null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {
+                    Collection<?> keys = ((IgniteConsistencyViolationException)e).keys();
+
+                    if (this.keys == null)
+                        this.keys = new GridConcurrentHashSet<>();
+
+                    this.keys.addAll(keys);
+
+                    if (e instanceof IgniteIrreparableConsistencyViolationException) {
+                        Collection<?> irreparableKeys = ((IgniteIrreparableConsistencyViolationException)e).irreparableKeys();
+
+                        if (this.irreparableKeys == null)
+                            this.irreparableKeys = new GridConcurrentHashSet<>();
+
+                        this.irreparableKeys.addAll(irreparableKeys);
+                    }
+                }
+            }
+            else
+                onDone(e);
+        }
+
+        LSNR_CALLS_UPD.incrementAndGet(this);
+
+        checkComplete();
+    }
+
+    /**
+     * Mark this future as initialized.
+     */
+    public final void markInitialized() {
+        if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG))

Review comment:
       `markInitialized` invoked in the same thread that invokes the `add` method. Is it required to have the CAS operation here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/IgniteIrreparableConsistencyViolationException.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+
+/**
+ * Irreparable consistency violation exception.
+ */
+public class IgniteIrreparableConsistencyViolationException extends IgniteConsistencyViolationException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Irreparable keys. */
+    private final Collection<?> irreparableKeys;
+
+    /**
+     * @param keys            Keys.
+     * @param irreparableKeys Irreparable keys.
+     */
+    public IgniteIrreparableConsistencyViolationException(Collection<?> keys, Collection<?> irreparableKeys) {
+        super(keys);

Review comment:
       Should we change error msg too? Currently it uses the message from super class. I think we should write down info about irreparable keys, used strategy, and made some explanation why exception happened. 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -254,51 +266,114 @@ else if (!canRemap)
      */
     protected abstract void reduce();
 
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> check() throws IgniteCheckedException {

Review comment:
       Can we make it `protected`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;
+
+    /** Listener calls. */
+    private volatile int lsnrCalls;
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
+    /** Keys. */
+    private volatile Collection<Object> keys;
+
+    /** Irreparable Keys. */
+    private volatile Collection<Object> irreparableKeys;
+
+    /**
+     * @param fut Future.
+     */
+    public void add(IgniteInternalFuture<Void> fut) {
+        size++; // All additions are from the same thread.
+
+        fut.listen(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteInternalFuture<Void> fut) {
+        Throwable e = fut.error();
+
+        if (e != null) {
+            if (e instanceof IgniteConsistencyViolationException) {
+                synchronized (this) {
+                    Collection<?> keys = ((IgniteConsistencyViolationException)e).keys();
+
+                    if (this.keys == null)
+                        this.keys = new GridConcurrentHashSet<>();

Review comment:
       Can we use `ConcurrentHashMap.newKeySet()` here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridCompoundReadRepairFuture.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Compound future that represents the result of the external fixes for some keys.
+ */
+public class GridCompoundReadRepairFuture extends GridFutureAdapter<Void> implements IgniteInClosure<IgniteInternalFuture<Void>> {
+    /** Initialization flag. */
+    private static final int INIT_FLAG = 0x1;
+
+    /** Flags updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> FLAGS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "initFlag");
+
+    /** Listener calls updater. */
+    private static final AtomicIntegerFieldUpdater<GridCompoundReadRepairFuture> LSNR_CALLS_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundReadRepairFuture.class, "lsnrCalls");
+
+    /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
+    private volatile int initFlag;

Review comment:
       Can we use boolean instead?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
##########
@@ -1712,20 +1712,6 @@ void updateAllAsyncInternal(
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected IgniteInternalFuture<Void> repairAsync(Collection<? extends K> keys,

Review comment:
       Now `GridCacheAdapter#repairAsync` can be `private`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776373097



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4833,6 +4835,9 @@ private boolean clearLocally0(K key, boolean readers) {
                 deserializeBinary,
                 needVer);
         }
+        catch (IgniteIrreparableConsistencyViolationException e) {
+            throw e;
+        }
         catch (IgniteConsistencyViolationException e) {
             repairAsync(key, ctx.operationContextPerCall(), false).get();

Review comment:
       Do not repair here for CHECK_ONLY.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] timoninmaxim commented on a change in pull request #9661: IGNITE-15330 Read Repair should support strategies

Posted by GitBox <gi...@apache.org>.
timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r776372863



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
##########
@@ -4833,6 +4835,9 @@ private boolean clearLocally0(K key, boolean readers) {
                 deserializeBinary,
                 needVer);
         }
+        catch (IgniteIrreparableConsistencyViolationException e) {
+            throw e;
+        }
         catch (IgniteConsistencyViolationException e) {
             repairAsync(key, ctx.operationContextPerCall(), false).get();

Review comment:
       Repair only failed keys `e.keys()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org