You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2018/10/09 14:47:17 UTC
[02/21] ignite git commit: IGNITE-9761 Fixed deadlock in WAL manager
- Fixes #4890.
IGNITE-9761 Fixed deadlock in WAL manager - Fixes #4890.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58dfe061
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58dfe061
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58dfe061
Branch: refs/heads/ignite-2.7-master
Commit: 58dfe061cf8b4c18ac57fa762a559d711cfbf274
Parents: 762e2f4
Author: Anton Kalashnikov <ka...@yandex.ru>
Authored: Tue Oct 2 18:26:20 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Oct 2 19:01:35 2018 +0300
----------------------------------------------------------------------
.../wal/aware/SegmentArchivedStorage.java | 8 ++--
.../persistence/wal/aware/SegmentAware.java | 9 +++++
.../wal/aware/SegmentLockStorage.java | 27 +++++--------
.../wal/aware/SegmentObservable.java | 10 ++---
.../persistence/wal/aware/SegmentAwareTest.java | 42 +++++++++++++++++++-
5 files changed, 70 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 1ed607e..c526ae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -63,10 +63,12 @@ class SegmentArchivedStorage extends SegmentObservable {
/**
* @param lastAbsArchivedIdx New value of last archived segment index.
*/
- synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
- this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+ void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
+ synchronized (this) {
+ this.lastAbsArchivedIdx = lastAbsArchivedIdx;
- notifyAll();
+ notifyAll();
+ }
notifyObservers(lastAbsArchivedIdx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 6ba0399..e46d93f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -220,6 +220,15 @@ public class SegmentAware {
}
/**
+ * Visible for test.
+ *
+ * @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+ */
+ void lockWorkSegment(long absIdx) {
+ segmentLockStorage.lockWorkSegment(absIdx);
+ }
+
+ /**
* @param absIdx Segment absolute index.
*/
public void releaseWorkSegment(long absIdx) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
index 2e145e7..f638d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
/**
@@ -29,7 +29,7 @@ public class SegmentLockStorage extends SegmentObservable {
* Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from
* {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
*/
- private Map<Long, Integer> locked = new HashMap<>();
+ private Map<Long, Integer> locked = new ConcurrentHashMap<>();
/**
* Check if WAL segment locked (protected from move to archive)
@@ -37,7 +37,7 @@ public class SegmentLockStorage extends SegmentObservable {
* @param absIdx Index for check reservation.
* @return {@code True} if index is locked.
*/
- public synchronized boolean locked(long absIdx) {
+ public boolean locked(long absIdx) {
return locked.containsKey(absIdx);
}
@@ -47,12 +47,8 @@ public class SegmentLockStorage extends SegmentObservable {
* segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- synchronized boolean lockWorkSegment(long absIdx) {
- Integer cur = locked.get(absIdx);
-
- cur = cur == null ? 1 : cur + 1;
-
- locked.put(absIdx, cur);
+ boolean lockWorkSegment(long absIdx) {
+ locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1);
return false;
}
@@ -61,15 +57,12 @@ public class SegmentLockStorage extends SegmentObservable {
* @param absIdx Segment absolute index.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- synchronized void releaseWorkSegment(long absIdx) {
- Integer cur = locked.get(absIdx);
-
- assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+ void releaseWorkSegment(long absIdx) {
+ locked.compute(absIdx, (idx, count) -> {
+ assert count != null && count >= 1 : "cur=" + count + ", absIdx=" + absIdx;
- if (cur == 1)
- locked.remove(absIdx);
- else
- locked.put(absIdx, cur - 1);
+ return count == 1 ? null : count - 1;
+ });
notifyObservers(absIdx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
index ba5ad30..3e91504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
@@ -26,12 +26,12 @@ import java.util.function.Consumer;
*/
public abstract class SegmentObservable {
/** Observers for handle changes of archived index. */
- private final List<Consumer<Long>> observers = new ArrayList<>();
+ private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>();
/**
* @param observer Observer for notification about segment's changes.
*/
- synchronized void addObserver(Consumer<Long> observer) {
+ void addObserver(Consumer<Long> observer) {
observers.add(observer);
}
@@ -40,7 +40,7 @@ public abstract class SegmentObservable {
*
* @param segmentId Segment which was been changed.
*/
- synchronized void notifyObservers(long segmentId) {
+ void notifyObservers(long segmentId) {
observers.forEach(observer -> observer.accept(segmentId));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58dfe061/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 7840b09..0869356 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -31,6 +31,46 @@ import static org.junit.Assert.assertThat;
* Test for {@link SegmentAware}.
*/
public class SegmentAwareTest extends TestCase {
+
+ /**
+ * Checking to avoid deadlock SegmentArchivedStorage.markAsMovedToArchive -> SegmentLockStorage.locked <->
+ * SegmentLockStorage.releaseWorkSegment -> SegmentArchivedStorage.onSegmentUnlocked
+ *
+ * @throws IgniteCheckedException if failed.
+ */
+ public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
+ SegmentAware aware = new SegmentAware(10, false);
+
+ int iterationCnt = 100_000;
+ int segmentToHandle = 1;
+
+ IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> {
+ int i = iterationCnt;
+
+ while (i-- > 0) {
+ try {
+ aware.markAsMovedToArchive(segmentToHandle);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> {
+ int i = iterationCnt;
+
+ while (i-- > 0) {
+ aware.lockWorkSegment(segmentToHandle);
+
+ aware.releaseWorkSegment(segmentToHandle);
+ }
+ });
+
+ archiverThread.get();
+ lockerThread.get();
+ }
+
/**
* Waiting finished when work segment is set.
*/
@@ -435,7 +475,7 @@ public class SegmentAwareTest extends TestCase {
public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
SegmentAware aware = new SegmentAware(10, true);
- for (int i = 0; i < 5 ; i++) {
+ for (int i = 0; i < 5; i++) {
aware.setLastArchivedAbsoluteIndex(i);
aware.waitNextSegmentToCompress();
}