You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2018/10/09 14:47:17 UTC

[02/21] ignite git commit: IGNITE-9761 Fixed deadlock in WAL manager - Fixes #4890.

IGNITE-9761 Fixed deadlock in WAL manager - Fixes #4890.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58dfe061
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58dfe061
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58dfe061

Branch: refs/heads/ignite-2.7-master
Commit: 58dfe061cf8b4c18ac57fa762a559d711cfbf274
Parents: 762e2f4
Author: Anton Kalashnikov <ka...@yandex.ru>
Authored: Tue Oct 2 18:26:20 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Oct 2 19:01:35 2018 +0300

----------------------------------------------------------------------
 .../wal/aware/SegmentArchivedStorage.java       |  8 ++--
 .../persistence/wal/aware/SegmentAware.java     |  9 +++++
 .../wal/aware/SegmentLockStorage.java           | 27 +++++--------
 .../wal/aware/SegmentObservable.java            | 10 ++---
 .../persistence/wal/aware/SegmentAwareTest.java | 42 +++++++++++++++++++-
 5 files changed, 70 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 1ed607e..c526ae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -63,10 +63,12 @@ class SegmentArchivedStorage extends SegmentObservable {
     /**
      * @param lastAbsArchivedIdx New value of last archived segment index.
      */
-    synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
-        this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+    void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
+        synchronized (this) {
+            this.lastAbsArchivedIdx = lastAbsArchivedIdx;
 
-        notifyAll();
+            notifyAll();
+        }
 
         notifyObservers(lastAbsArchivedIdx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 6ba0399..e46d93f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -220,6 +220,15 @@ public class SegmentAware {
     }
 
     /**
+     * Visible for test.
+     *
+     * @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+     */
+    void lockWorkSegment(long absIdx) {
+        segmentLockStorage.lockWorkSegment(absIdx);
+    }
+
+    /**
      * @param absIdx Segment absolute index.
      */
     public void releaseWorkSegment(long absIdx) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
index 2e145e7..f638d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 
 /**
@@ -29,7 +29,7 @@ public class SegmentLockStorage extends SegmentObservable {
      * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from
      * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
      */
-    private Map<Long, Integer> locked = new HashMap<>();
+    private Map<Long, Integer> locked = new ConcurrentHashMap<>();
 
     /**
      * Check if WAL segment locked (protected from move to archive)
@@ -37,7 +37,7 @@ public class SegmentLockStorage extends SegmentObservable {
      * @param absIdx Index for check reservation.
      * @return {@code True} if index is locked.
      */
-    public synchronized boolean locked(long absIdx) {
+    public boolean locked(long absIdx) {
         return locked.containsKey(absIdx);
     }
 
@@ -47,12 +47,8 @@ public class SegmentLockStorage extends SegmentObservable {
      * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
      */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-    synchronized boolean lockWorkSegment(long absIdx) {
-        Integer cur = locked.get(absIdx);
-
-        cur = cur == null ? 1 : cur + 1;
-
-        locked.put(absIdx, cur);
+    boolean lockWorkSegment(long absIdx) {
+        locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1);
 
         return false;
     }
@@ -61,15 +57,12 @@ public class SegmentLockStorage extends SegmentObservable {
      * @param absIdx Segment absolute index.
      */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-    synchronized void releaseWorkSegment(long absIdx) {
-        Integer cur = locked.get(absIdx);
-
-        assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+    void releaseWorkSegment(long absIdx) {
+        locked.compute(absIdx, (idx, count) -> {
+            assert count != null && count >= 1 : "cur=" + count + ", absIdx=" + absIdx;
 
-        if (cur == 1)
-            locked.remove(absIdx);
-        else
-            locked.put(absIdx, cur - 1);
+            return count == 1 ? null : count - 1;
+        });
 
         notifyObservers(absIdx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
index ba5ad30..3e91504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Consumer;
 
 /**
@@ -26,12 +26,12 @@ import java.util.function.Consumer;
  */
 public abstract class SegmentObservable {
     /** Observers for handle changes of archived index. */
-    private final List<Consumer<Long>> observers = new ArrayList<>();
+    private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>();
 
     /**
      * @param observer Observer for notification about segment's changes.
      */
-    synchronized void addObserver(Consumer<Long> observer) {
+    void addObserver(Consumer<Long> observer) {
         observers.add(observer);
     }
 
@@ -40,7 +40,7 @@ public abstract class SegmentObservable {
      *
      * @param segmentId Segment which was been changed.
      */
-    synchronized void notifyObservers(long segmentId) {
+    void notifyObservers(long segmentId) {
         observers.forEach(observer -> observer.accept(segmentId));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 7840b09..0869356 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -31,6 +31,46 @@ import static org.junit.Assert.assertThat;
  * Test for {@link SegmentAware}.
  */
 public class SegmentAwareTest extends TestCase {
+
+    /**
+     * Checking to avoid deadlock SegmentArchivedStorage.markAsMovedToArchive -> SegmentLockStorage.locked <->
+     * SegmentLockStorage.releaseWorkSegment -> SegmentArchivedStorage.onSegmentUnlocked
+     *
+     * @throws IgniteCheckedException if failed.
+     */
+    public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
+        SegmentAware aware = new SegmentAware(10, false);
+
+        int iterationCnt = 100_000;
+        int segmentToHandle = 1;
+
+        IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> {
+            int i = iterationCnt;
+
+            while (i-- > 0) {
+                try {
+                    aware.markAsMovedToArchive(segmentToHandle);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> {
+            int i = iterationCnt;
+
+            while (i-- > 0) {
+                aware.lockWorkSegment(segmentToHandle);
+
+                aware.releaseWorkSegment(segmentToHandle);
+            }
+        });
+
+        archiverThread.get();
+        lockerThread.get();
+    }
+
     /**
      * Waiting finished when work segment is set.
      */
@@ -435,7 +475,7 @@ public class SegmentAwareTest extends TestCase {
     public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
         SegmentAware aware = new SegmentAware(10, true);
 
-        for (int i = 0; i < 5 ; i++) {
+        for (int i = 0; i < 5; i++) {
             aware.setLastArchivedAbsoluteIndex(i);
             aware.waitNextSegmentToCompress();
         }