You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/23 13:07:31 UTC
[08/18] ignite git commit: ignite-gg-12163 fix for WAL history
reservation was ported, test was fixed
ignite-gg-12163 fix for WAL history reservation was ported, test was fixed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2625dd89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2625dd89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2625dd89
Branch: refs/heads/ignite-5075-pds
Commit: 2625dd8903b6b5ecee2b314ca6916310e65ace5e
Parents: 89d0cd2
Author: Sergey Chugunov <se...@gmail.com>
Authored: Mon May 22 14:40:25 2017 +0300
Committer: Sergey Chugunov <se...@gmail.com>
Committed: Mon May 22 14:45:17 2017 +0300
----------------------------------------------------------------------
.../IgniteDhtPartitionHistorySuppliersMap.java | 3 +
.../GridCacheDatabaseSharedManager.java | 79 +++++++++++++-------
.../IgniteWalHistoryReservationsSelfTest.java | 13 +++-
3 files changed, 66 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 333eb97..15294c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -80,6 +80,9 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
* @param cntr Partition counter.
*/
public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) {
+ if (map == null)
+ map = new HashMap<>();
+
Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId);
if (nodeMap == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index 08857ee..0ea8b68 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -883,32 +883,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= ggWalRebalanceThreshold)
continue;
- for (Long cpTs : checkpointHist.checkpoints()) {
- try {
- CheckpointEntry entry = checkpointHist.entry(cpTs);
-
- if (!entry.cacheStates.containsKey(cacheCtx.cacheId()) ||
- !entry.cacheStates.get(cacheCtx.cacheId()).partitions().containsKey(part.id()))
- continue;
-
- WALPointer ptr = searchPartitionCounter(cacheCtx, part.id(), entry.checkpointTimestamp());
-
- if (ptr != null && cctx.wal().reserve(ptr)) {
- Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
+ CheckpointEntry cpEntry = searchCheckpointEntry(cacheCtx, part.id(), null);
- if (cacheMap == null) {
- cacheMap = new HashMap<>();
+ try {
+ if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) {
+ Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
- reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
- }
+ if (cacheMap == null) {
+ cacheMap = new HashMap<>();
- cacheMap.put(part.id(), new T2<>(entry.partitionCounter(cacheCtx.cacheId(), part.id()), ptr));
+ reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
}
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Error while trying to reserve history", ex);
- }
+ cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(cacheCtx.cacheId(), part.id()), cpEntry.cpMark));
+ }
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Error while trying to reserve history", ex);
}
}
}
@@ -948,7 +939,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) {
- WALPointer ptr = searchPartitionCounter(cctx.cacheContext(cacheId), partId, cntr);
+ CheckpointEntry cpEntry = searchCheckpointEntry(cctx.cacheContext(cacheId), partId, cntr);
+
+ if (cpEntry == null)
+ return false;
+
+ WALPointer ptr = cpEntry.cpMark;
if (ptr == null)
return false;
@@ -1063,12 +1059,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @param cacheCtx Cache context.
* @param part Partition ID.
- * @param partCntrSince Partition counter.
- * @return WAL pointer or {@code null} if failed to search.
+ * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+ * @return Checkpoint entry or {@code null} if failed to search.
*/
- public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, Long partCntrSince) {
+ @Nullable public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
+ CheckpointEntry entry = searchCheckpointEntry(cacheCtx, part, partCntrSince);
+
+ if (entry == null)
+ return null;
+
+ return entry.cpMark;
+ }
+
+ /**
+ * Tries to search for a WAL pointer for the given partition counter start.
+ *
+ * @param cacheCtx Cache context.
+ * @param part Partition ID.
+ * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+ * @return Checkpoint entry or {@code null} if failed to search.
+ */
+ @Nullable private CheckpointEntry searchCheckpointEntry(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
boolean hasGap = false;
- WALPointer first = null;
+ CheckpointEntry first = null;
for (Long cpTs : checkpointHist.checkpoints()) {
try {
@@ -1077,8 +1090,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part);
if (foundCntr != null) {
- if (foundCntr <= partCntrSince) {
- first = entry.cpMark;
+ if (partCntrSince == null) {
+ if (hasGap) {
+ first = entry;
+
+ hasGap = false;
+ }
+
+ if (first == null)
+ first = entry;
+ }
+ else if (foundCntr <= partCntrSince) {
+ first = entry;
hasGap = false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
index 82a6ac1..e241085 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
@@ -114,6 +114,17 @@ public class IgniteWalHistoryReservationsSelfTest extends GridCommonAbstractTest
forceCheckpoint();
+ for (int k = 0; k < entryCnt; k++)
+ cache.put(k, k * 2);
+
+ forceCheckpoint();
+
+ for (int k = 0; k < entryCnt; k++)
+ cache.put(k, k);
+
+ forceCheckpoint();
+
+
Lock lock = cache.lock(0);
lock.lock();
@@ -329,7 +340,7 @@ public class IgniteWalHistoryReservationsSelfTest extends GridCommonAbstractTest
assert reserved;
- stopGrid(initGridCnt - 1);
+ stopGrid(Integer.toString(initGridCnt - 1), true, false);
}
finally {
lock.unlock();