You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/05/22 17:44:31 UTC

[08/15] ignite git commit: ignite-gg-12163 fix for WAL history reservation was ported, test was fixed

ignite-gg-12163 fix for WAL history reservation was ported, test was fixed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2625dd89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2625dd89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2625dd89

Branch: refs/heads/ignite-5267
Commit: 2625dd8903b6b5ecee2b314ca6916310e65ace5e
Parents: 89d0cd2
Author: Sergey Chugunov <se...@gmail.com>
Authored: Mon May 22 14:40:25 2017 +0300
Committer: Sergey Chugunov <se...@gmail.com>
Committed: Mon May 22 14:45:17 2017 +0300

----------------------------------------------------------------------
 .../IgniteDhtPartitionHistorySuppliersMap.java  |  3 +
 .../GridCacheDatabaseSharedManager.java         | 79 +++++++++++++-------
 .../IgniteWalHistoryReservationsSelfTest.java   | 13 +++-
 3 files changed, 66 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 333eb97..15294c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -80,6 +80,9 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
      * @param cntr Partition counter.
      */
     public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) {
+        if (map == null)
+            map = new HashMap<>();
+
         Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId);
 
         if (nodeMap == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index 08857ee..0ea8b68 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -883,32 +883,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= ggWalRebalanceThreshold)
                     continue;
 
-                for (Long cpTs : checkpointHist.checkpoints()) {
-                    try {
-                        CheckpointEntry entry = checkpointHist.entry(cpTs);
-
-                        if (!entry.cacheStates.containsKey(cacheCtx.cacheId()) ||
-                            !entry.cacheStates.get(cacheCtx.cacheId()).partitions().containsKey(part.id()))
-                            continue;
-
-                        WALPointer ptr = searchPartitionCounter(cacheCtx, part.id(), entry.checkpointTimestamp());
-
-                        if (ptr != null && cctx.wal().reserve(ptr)) {
-                            Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
+                CheckpointEntry cpEntry = searchCheckpointEntry(cacheCtx, part.id(), null);
 
-                            if (cacheMap == null) {
-                                cacheMap = new HashMap<>();
+                try {
+                    if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) {
+                        Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
 
-                                reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
-                            }
+                        if (cacheMap == null) {
+                            cacheMap = new HashMap<>();
 
-                            cacheMap.put(part.id(), new T2<>(entry.partitionCounter(cacheCtx.cacheId(), part.id()), ptr));
+                            reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
                         }
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Error while trying to reserve history", ex);
-                    }
 
+                        cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(cacheCtx.cacheId(), part.id()), cpEntry.cpMark));
+                    }
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Error while trying to reserve history", ex);
                 }
             }
         }
@@ -948,7 +939,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) {
-        WALPointer ptr = searchPartitionCounter(cctx.cacheContext(cacheId), partId, cntr);
+        CheckpointEntry cpEntry = searchCheckpointEntry(cctx.cacheContext(cacheId), partId, cntr);
+
+        if (cpEntry == null)
+            return false;
+
+        WALPointer ptr = cpEntry.cpMark;
 
         if (ptr == null)
             return false;
@@ -1063,12 +1059,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      *
      * @param cacheCtx Cache context.
      * @param part Partition ID.
-     * @param partCntrSince Partition counter.
-     * @return WAL pointer or {@code null} if failed to search.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
      */
-    public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, Long partCntrSince) {
+    @Nullable public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
+        CheckpointEntry entry = searchCheckpointEntry(cacheCtx, part, partCntrSince);
+
+        if (entry == null)
+            return null;
+
+        return entry.cpMark;
+    }
+
+    /**
+     * Tries to search for a WAL pointer for the given partition counter start.
+     *
+     * @param cacheCtx Cache context.
+     * @param part Partition ID.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
+     */
+    @Nullable private CheckpointEntry searchCheckpointEntry(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
         boolean hasGap = false;
-        WALPointer first = null;
+        CheckpointEntry first = null;
 
         for (Long cpTs : checkpointHist.checkpoints()) {
             try {
@@ -1077,8 +1090,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part);
 
                 if (foundCntr != null) {
-                    if (foundCntr <= partCntrSince) {
-                        first = entry.cpMark;
+                    if (partCntrSince == null) {
+                        if (hasGap) {
+                            first = entry;
+
+                            hasGap = false;
+                        }
+
+                        if (first == null)
+                            first = entry;
+                    }
+                    else if (foundCntr <= partCntrSince) {
+                        first = entry;
 
                         hasGap = false;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2625dd89/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
index 82a6ac1..e241085 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java
@@ -114,6 +114,17 @@ public class IgniteWalHistoryReservationsSelfTest extends GridCommonAbstractTest
 
         forceCheckpoint();
 
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, k * 2);
+
+        forceCheckpoint();
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, k);
+
+        forceCheckpoint();
+
+
         Lock lock = cache.lock(0);
 
         lock.lock();
@@ -329,7 +340,7 @@ public class IgniteWalHistoryReservationsSelfTest extends GridCommonAbstractTest
 
             assert reserved;
 
-            stopGrid(initGridCnt - 1);
+            stopGrid(Integer.toString(initGridCnt - 1), true, false);
         }
         finally {
             lock.unlock();