You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2019/08/29 09:55:06 UTC
[ignite] branch master updated: IGNITE-11584 Implement batch
insertion of new cache entries in FreeList to improve rebalancing (#6364)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 560f4833 IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364)
560f4833 is described below
commit 560f48332f511edd0bd1efdc1e4d372491ef8779
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Aug 29 12:54:55 2019 +0300
IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364)
---
.../processors/cache/GridCacheEntryEx.java | 39 ++-
.../processors/cache/GridCacheMapEntry.java | 35 ++-
.../cache/IgniteCacheOffheapManager.java | 25 ++
.../cache/IgniteCacheOffheapManagerImpl.java | 61 +++++
.../dht/preloader/GridDhtPartitionDemander.java | 224 +++++++---------
.../cache/persistence/DataRowCacheAware.java | 68 +++++
.../cache/persistence/GridCacheOffheapManager.java | 16 ++
.../IgniteCacheDatabaseSharedManager.java | 31 +--
.../processors/cache/persistence/RowStore.java | 15 +-
.../persistence/evict/NoOpPageEvictionTracker.java | 5 +
.../evict/PageAbstractEvictionTracker.java | 15 ++
.../persistence/evict/PageEvictionTracker.java | 7 +
.../persistence/freelist/AbstractFreeList.java | 283 +++++++++++++++++----
.../cache/persistence/freelist/FreeList.java | 14 +-
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
.../processors/database/CacheFreeListSelfTest.java | 186 ++++++++++++--
16 files changed, 785 insertions(+), 242 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 9aec399..71702ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -792,6 +792,42 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
+ default boolean initialValue(CacheObject val,
+ GridCacheVersion ver,
+ @Nullable MvccVersion mvccVer,
+ @Nullable MvccVersion newMvccVer,
+ byte mvccTxState,
+ byte newMvccTxState,
+ long ttl,
+ long expireTime,
+ boolean preload,
+ AffinityTopologyVersion topVer,
+ GridDrType drType,
+ boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ return initialValue(val, ver, null, null, TxState.NA, TxState.NA,
+ ttl, expireTime, preload, topVer, drType, fromStore, null);
+ }
+
+ /**
+ * Sets new value if current version is <tt>0</tt>
+ *
+ * @param val New value.
+ * @param ver Version to use.
+ * @param mvccVer Mvcc version.
+ * @param newMvccVer New mvcc version.
+ * @param mvccTxState Tx state hint for mvcc version.
+ * @param newMvccTxState Tx state hint for new mvcc version.
+ * @param ttl Time to live.
+ * @param expireTime Expiration time.
+ * @param preload Flag indicating whether entry is being preloaded.
+ * @param topVer Topology version.
+ * @param drType DR type.
+ * @param fromStore {@code True} if value was loaded from store.
+ * @param row Pre-created data row, associated with this cache entry.
+ * @return {@code True} if initial value was set.
+ * @throws IgniteCheckedException In case of error.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ */
public boolean initialValue(CacheObject val,
GridCacheVersion ver,
@Nullable MvccVersion mvccVer,
@@ -803,7 +839,8 @@ public interface GridCacheEntryEx {
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
- boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ boolean fromStore,
+ @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* Create versioned entry for this cache entry.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 194c97a..725c01a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3310,7 +3310,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
- boolean fromStore
+ boolean fromStore,
+ CacheDataRow row
) throws IgniteCheckedException, GridCacheEntryRemovedException {
ensureFreeSpace();
@@ -3386,7 +3387,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer);
}
else
- storeValue(val, expTime, ver);
+ storeValue(val, expTime, ver, null, row);
}
}
else {
@@ -3417,7 +3418,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
else
// Optimization to access storage only once.
- update = storeValue(val, expTime, ver, p);
+ update = storeValue(val, expTime, ver, p, row);
}
if (update) {
@@ -4257,7 +4258,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
protected boolean storeValue(@Nullable CacheObject val,
long expireTime,
GridCacheVersion ver) throws IgniteCheckedException {
- return storeValue(val, expireTime, ver, null);
+ return storeValue(val, expireTime, ver, null, null);
}
/**
@@ -4267,6 +4268,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param expireTime Expire time.
* @param ver New entry version.
* @param predicate Optional predicate.
+ * @param row Pre-created data row, associated with this cache entry.
* @return {@code True} if storage was modified.
* @throws IgniteCheckedException If update failed.
*/
@@ -4274,10 +4276,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable CacheObject val,
long expireTime,
GridCacheVersion ver,
- @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
+ @Nullable IgnitePredicate<CacheDataRow> predicate,
+ @Nullable CacheDataRow row
+ ) throws IgniteCheckedException {
assert lock.isHeldByCurrentThread();
- UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
+ UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row);
cctx.offheap().invoke(cctx, key, localPartition(), closure);
@@ -5730,12 +5734,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param predicate Optional predicate.
*/
UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
- @Nullable IgnitePredicate<CacheDataRow> predicate) {
+ @Nullable IgnitePredicate<CacheDataRow> predicate, @Nullable CacheDataRow newRow) {
this.entry = entry;
this.val = val;
this.ver = ver;
this.expireTime = expireTime;
this.predicate = predicate;
+ this.newRow = newRow;
}
/** {@inheritDoc} */
@@ -5755,13 +5760,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (val != null) {
- newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
- entry.cctx,
- entry.key,
- val,
- ver,
- expireTime,
- oldRow);
+ if (newRow == null) {
+ newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+ entry.cctx,
+ entry.key,
+ val,
+ ver,
+ expireTime,
+ oldRow);
+ }
treeOp = oldRow != null && oldRow.link() == newRow.link() ?
IgniteTree.OperationType.IN_PLACE : IgniteTree.OperationType.PUT;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index ab8d338..e73ad52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.cache.Cache;
@@ -29,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
@@ -504,6 +508,17 @@ public interface IgniteCacheOffheapManager {
throws IgniteCheckedException;
/**
+ * Store entries.
+ *
+ * @param partId Partition number.
+ * @param infos Entry infos.
+ * @param initPred Applied to all created rows. Each row that not matches the predicate is removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos,
+ IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException;
+
+ /**
* Clears offheap entries.
*
* @param cctx Cache context.
@@ -695,6 +710,16 @@ public interface IgniteCacheOffheapManager {
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
/**
+ * Insert rows into page memory.
+ *
+ * @param rows Rows.
+ * @param initPred Applied to all rows. Each row that not matches the predicate is removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void insertRows(Collection<DataRowCacheAware> rows,
+ IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException;
+
+ /**
* @param cctx Cache context.
* @param cleanupRows Rows to cleanup.
* @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 57cca44..0d9b9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
@@ -110,6 +112,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -122,6 +125,7 @@ import org.jetbrains.annotations.Nullable;
import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.INITIAL_VERSION;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
@@ -145,6 +149,9 @@ import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
*
*/
public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
+ /** The maximum number of entries that can be preloaded under checkpoint read lock. */
+ public static final int PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK = 100;
+
/** */
private final boolean failNodeOnPartitionInconsistency = Boolean.getBoolean(
IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY);
@@ -1207,6 +1214,37 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos,
+ IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+ CacheDataStore dataStore = dataStore(partId);
+
+ List<DataRowCacheAware> batch = new ArrayList<>(PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK);
+
+ while (infos.hasNext()) {
+ GridCacheEntryInfo info = infos.next();
+
+ assert info.ttl() == TTL_ETERNAL : info.ttl();
+
+ batch.add(new DataRowCacheAware(info.key(),
+ info.value(),
+ info.version(),
+ partId,
+ info.expireTime(),
+ info.cacheId(),
+ grp.storeCacheIdInDataPage()));
+
+ if (batch.size() == PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK) {
+ dataStore.insertRows(batch, initPred);
+
+ batch.clear();
+ }
+ }
+
+ if (!batch.isEmpty())
+ dataStore.insertRows(batch, initPred);
+ }
+
+ /** {@inheritDoc} */
@Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException {
CacheDataStore dataStore;
@@ -1713,6 +1751,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataRow;
}
+ /** {@inheritDoc} */
+ @Override public void insertRows(Collection<DataRowCacheAware> rows,
+ IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ rowStore.addRows(F.view(rows, row -> row.value() != null), grp.statisticsHolderData());
+
+ boolean cacheIdAwareGrp = grp.sharedGroup() || grp.storeCacheIdInDataPage();
+
+ for (DataRowCacheAware row : rows) {
+ row.storeCacheId(cacheIdAwareGrp);
+
+ if (!initPred.apply(row) && row.value() != null)
+ rowStore.removeRow(row.link(), grp.statisticsHolderData());
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* @param key Cache key.
* @param val Cache value.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9386193..1b471f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -56,15 +56,15 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -80,6 +80,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOAD
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
+import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@@ -752,10 +754,16 @@ public class GridDhtPartitionDemander {
try {
Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
- if (grp.mvccEnabled())
- mvccPreloadEntries(topVer, node, p, infos);
- else
- preloadEntries(topVer, node, p, infos);
+ try {
+ if (grp.mvccEnabled())
+ mvccPreloadEntries(topVer, node, p, infos);
+ else
+ preloadEntries(topVer, p, infos);
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+ }
fut.processed.get(p).increment();
@@ -890,7 +898,7 @@ public class GridDhtPartitionDemander {
ctx.database().checkpointReadLock();
try {
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK; i++) {
boolean hasMore = infos.hasNext();
assert hasMore || !entryHist.isEmpty();
@@ -922,20 +930,9 @@ public class GridDhtPartitionDemander {
}
if (cctx != null) {
- if (!mvccPreloadEntry(cctx, node, entryHist, topVer, p)) {
- if (log.isTraceEnabled())
- log.trace("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p +
- ", entry=" + entryHist.get(entryHist.size() - 1) + ']');
+ mvccPreloadEntry(cctx, node, entryHist, topVer, p);
- return; // Skip current partition.
- }
-
- //TODO: IGNITE-11330: Update metrics for touched cache only.
- for (GridCacheContext ctx : grp.caches()) {
- if (ctx.statisticsEnabled())
- ctx.cache().metrics0().onRebalanceKeyReceived();
- }
+ updateGroupMetrics();
}
if (!hasMore)
@@ -954,140 +951,100 @@ public class GridDhtPartitionDemander {
}
/**
- * Adds entries with theirs history to partition p.
+ * Adds entries to partition p.
*
- * @param node Node which sent entry.
+ * @param topVer Topology version.
* @param p Partition id.
* @param infos Entries info for preload.
- * @param topVer Topology version.
- * @throws IgniteInterruptedCheckedException If interrupted.
+ * @throws IgniteCheckedException If failed.
*/
- private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p,
+ private void preloadEntries(AffinityTopologyVersion topVer, int p,
Iterator<GridCacheEntryInfo> infos) throws IgniteCheckedException {
- GridCacheContext cctx = null;
-
- // Loop through all received entries and try to preload them.
- while (infos.hasNext()) {
- ctx.database().checkpointReadLock();
-
- try {
- for (int i = 0; i < 100; i++) {
- if (!infos.hasNext())
- break;
-
- GridCacheEntryInfo entry = infos.next();
-
- if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) {
- cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext();
-
- if (cctx == null)
- continue;
- else if (cctx.isNear())
- cctx = cctx.dhtCache().context();
- }
- if (!preloadEntry(node, p, entry, topVer, cctx)) {
- if (log.isTraceEnabled())
- log.trace("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
- return;
- }
-
- //TODO: IGNITE-11330: Update metrics for touched cache only.
- for (GridCacheContext ctx : grp.caches()) {
- if (ctx.statisticsEnabled())
- ctx.cache().metrics0().onRebalanceKeyReceived();
- }
- }
- }
- finally {
- ctx.database().checkpointReadUnlock();
+ grp.offheap().storeEntries(p, infos, new IgnitePredicateX<CacheDataRow>() {
+ @Override public boolean applyx(CacheDataRow row) throws IgniteCheckedException {
+ return preloadEntry(row, topVer);
}
- }
+ });
}
/**
* Adds {@code entry} to partition {@code p}.
*
- * @param from Node which sent entry.
- * @param p Partition id.
- * @param entry Preloaded entry.
+ * @param row Data row.
* @param topVer Topology version.
- * @param cctx Cache context.
- * @return {@code False} if partition has become invalid during preloading.
+ * @return {@code True} if the initial value was set for the specified cache entry.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private boolean preloadEntry(
- ClusterNode from,
- int p,
- GridCacheEntryInfo entry,
- AffinityTopologyVersion topVer,
- GridCacheContext cctx
- ) throws IgniteCheckedException {
+ private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ assert !grp.mvccEnabled();
assert ctx.database().checkpointLockIsHeldByThread();
- try {
- GridCacheEntryEx cached = null;
+ updateGroupMetrics();
- try {
- cached = cctx.cache().entryEx(entry.key(), topVer);
+ GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(row.cacheId()) : grp.singleCacheContext();
- if (log.isTraceEnabled()) {
- log.trace("Rebalancing key [key=" + entry.key() + ", part=" + p + ", fromNode=" +
- from.id() + ", grpId=" + grp.groupId() + ']');
- }
+ if (cctx == null)
+ return false;
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccVersion() : null,
- cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccVersion() : null,
- cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA,
- cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA,
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
- false
- )) {
- cached.touch(); // Start tracking.
+ cctx = cctx.isNear() ? cctx.dhtCache().context() : cctx;
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
- cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
- null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
- false, null, null, null, true);
- }
- else {
- cached.touch(); // Start tracking.
+ GridCacheEntryEx cached = cctx.cache().entryEx(row.key(), topVer);
- if (log.isTraceEnabled())
- log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
- }
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Rebalancing key [key=" + cached.key() + ", part=" + cached.partition() +
+ ", grpId=" + grp.groupId() + ']');
}
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isTraceEnabled())
- log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
- cached.key() + ", part=" + p + ']');
+
+ assert row.expireTime() >= 0 : row.expireTime();
+
+ if (cached.initialValue(
+ row.value(),
+ row.version(),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+ false,
+ row
+ )) {
+ cached.touch(); // Start tracking.
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+ cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
+ null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, row.value(), true, null,
+ false, null, null, null, true);
+
+ return true;
}
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+ else {
+ cached.touch(); // Start tracking.
- return false;
+ if (log.isTraceEnabled())
+ log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+ ", part=" + cached.partition() + ']');
}
}
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isTraceEnabled())
+ log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+ cached.key() + ", part=" + cached.partition() + ']');
+ }
catch (IgniteInterruptedCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [" +
+ "key=" + row.key() + ", part=" + row.partition() + ']', e);
}
- return true;
+ return false;
}
/**
@@ -1098,7 +1055,7 @@ public class GridDhtPartitionDemander {
* @param history Mvcc entry history.
* @param topVer Topology version.
* @param p Partition id.
- * @return {@code False} if partition has become invalid during preloading.
+ * @return {@code True} if the initial value was set for the specified cache entry.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
private boolean mvccPreloadEntry(
@@ -1131,6 +1088,8 @@ public class GridDhtPartitionDemander {
cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, null, true, null,
false, null, null, null, true);
+
+ return true;
}
else {
cached.touch(); // Start tracking.
@@ -1145,12 +1104,6 @@ public class GridDhtPartitionDemander {
log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
cached.key() + ", part=" + p + ']');
}
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
- return false;
- }
}
catch (IgniteInterruptedCheckedException | ClusterTopologyCheckedException e) {
throw e;
@@ -1160,7 +1113,7 @@ public class GridDhtPartitionDemander {
ctx.localNode() + ", node=" + from.id() + ", key=" + info.key() + ", part=" + p + ']', e);
}
- return true;
+ return false;
}
/**
@@ -1173,6 +1126,19 @@ public class GridDhtPartitionDemander {
return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
}
+ /**
+ * Update rebalancing metrics.
+ */
+ private void updateGroupMetrics() {
+ // TODO: IGNITE-11330: Update metrics for touched cache only.
+ // Due to historical rebalancing "EstimatedRebalancingKeys" metric is currently calculated for the whole cache
+ // group (by partition counters), so "RebalancedKeys" and "RebalancingKeysRate" is calculated in the same way.
+ for (GridCacheContext cctx0 : grp.caches()) {
+ if (cctx0.statisticsEnabled())
+ cctx0.cache().metrics0().onRebalanceKeyReceived();
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionDemander.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java
new file mode 100644
index 0000000..baee401
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data row implementation that can optionally hide the cache identifier and can set {@code null} as value.<br> It is
+ * used to simplify storing a data row into page memory, because in some cases the cache identifier is not stored on the
+ * data pages, but is required to link this data row in {@code BPlusTree}.
+ */
+public class DataRowCacheAware extends DataRow {
+ /** Flag indicates that cacheId should be stored in data page. */
+ private boolean storeCacheId;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param part Partition.
+ * @param expireTime Expire time.
+ * @param cacheId Cache ID.
+ * @param storeCacheId Flag indicates that cacheId should be stored in data page.
+ */
+ public DataRowCacheAware(KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, int part,
+ long expireTime, int cacheId, boolean storeCacheId) {
+ super(key, val, ver, part, expireTime, cacheId);
+
+ storeCacheId(storeCacheId);
+ }
+
+ /**
+ * @param storeCacheId Flag indicates that cacheId should be stored in data page.
+ */
+ public void storeCacheId(boolean storeCacheId) {
+ this.storeCacheId = storeCacheId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int cacheId() {
+ return storeCacheId ? cacheId : CU.UNDEFINED_CACHE_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable CacheObject value() {
+ return val;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 0b027f1..bbddc1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2383,6 +2385,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public void insertRows(Collection<DataRowCacheAware> rows,
+ IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ ctx.database().checkpointReadLock();
+
+ try {
+ delegate.insertRows(rows, initPred);
+ } finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public int cleanup(GridCacheContext cctx,
@Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index fb40901..aad30fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -994,35 +994,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
if (memPlc == null)
return;
- DataRegionConfiguration plcCfg = memPlc.config();
+ while (memPlc.evictionTracker().evictionRequired()) {
+ warnFirstEvict(memPlc.config());
- if (plcCfg.getPageEvictionMode() == DataPageEvictionMode.DISABLED || plcCfg.isPersistenceEnabled())
- return;
-
- long memorySize = plcCfg.getMaxSize();
-
- PageMemory pageMem = memPlc.pageMemory();
-
- int sysPageSize = pageMem.systemPageSize();
-
- CacheFreeList freeList = freeListMap.get(plcCfg.getName());
-
- for (;;) {
- long allocatedPagesCnt = pageMem.loadedPages();
-
- int emptyDataPagesCnt = freeList.emptyDataPages();
-
- boolean shouldEvict = allocatedPagesCnt > (memorySize / sysPageSize * plcCfg.getEvictionThreshold()) &&
- emptyDataPagesCnt < plcCfg.getEmptyPagesPoolSize();
-
- if (shouldEvict) {
- warnFirstEvict(plcCfg);
-
- memPlc.evictionTracker().evictDataPage();
+ memPlc.evictionTracker().evictDataPage();
- memPlc.memoryMetrics().updateEvictionRate();
- } else
- break;
+ memPlc.memoryMetrics().updateEvictionRate();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index bdd1c2d..8601d98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -26,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -118,6 +119,18 @@ public class RowStore {
}
/**
+ * @param rows Rows.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addRows(Collection<? extends CacheDataRow> rows,
+ IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
+ freeList.insertDataRows(rows, statHolder);
+ }
+
+ /**
* @param link Row link.
* @param row New row data.
* @return {@code True} if was able to update row.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
index b420ecd..1d0b171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
@@ -47,4 +47,9 @@ public class NoOpPageEvictionTracker implements PageEvictionTracker {
@Override public void forgetPage(long pageId) throws IgniteCheckedException {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public boolean evictionRequired() {
+ return false;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
index 41731b1..703fc6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
@@ -52,6 +53,9 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker
/** Shared context. */
private final GridCacheSharedContext sharedCtx;
+ /** Data region configuration. */
+ private final DataRegionConfiguration regCfg;
+
/**
* @param pageMem Page memory.
* @param plcCfg Data region configuration.
@@ -66,12 +70,23 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker
this.sharedCtx = sharedCtx;
+ regCfg = plcCfg;
+
trackingSize = pageMem.totalPages();
baseCompactTs = (U.currentTimeMillis() - DAY) >> COMPACT_TS_SHIFT;
// We subtract day to avoid fail in case of daylight shift or timezone change.
}
+ /** {@inheritDoc} */
+ @Override public boolean evictionRequired() {
+ AbstractFreeList freeList = (AbstractFreeList)sharedCtx.database().freeList(regCfg.getName());
+
+ double pagesThreshold = regCfg.getEvictionThreshold() * regCfg.getMaxSize() / pageMem.systemPageSize();
+
+ return pageMem.loadedPages() > pagesThreshold && freeList.emptyDataPages() < regCfg.getEmptyPagesPoolSize();
+ }
+
/**
* @param pageIdx Page index.
* @return true if at least one data row has been evicted
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
index baa5462..fa17ea4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
@@ -33,6 +33,13 @@ public interface PageEvictionTracker extends LifecycleAware {
public void touchPage(long pageId) throws IgniteCheckedException;
/**
+ * Check if page eviction is required according to the configured policy.
+ *
+ * @return {@code True} if eviction required.
+ */
+ public boolean evictionRequired();
+
+ /**
* Evicts one data page.
* In most cases, all entries will be removed from the page.
* Method guarantees removing at least one entry from "evicted" data page. Removing all entries may be
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
index b677116..cb32219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.processors.cache.persistence.freelist;
+import java.util.Collection;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -40,8 +43,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseB
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
-import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridCursorIteratorWrapper;
+import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -130,13 +133,15 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
}
}
- /** */
- private final PageHandler<T, Integer> writeRow = new WriteRowHandler();
+ /** Write a single row on a single page. */
+ private final WriteRowHandler writeRowHnd = new WriteRowHandler();
- /**
- *
- */
- private final class WriteRowHandler extends PageHandler<T, Integer> {
+ /** Write multiple rows on a single page. */
+ private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler();
+
+ /** */
+ private class WriteRowHandler extends PageHandler<T, Integer> {
+ /** {@inheritDoc} */
@Override public Integer run(
int cacheId,
long pageId,
@@ -148,6 +153,31 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
int written,
IoStatisticsHolder statHolder)
throws IgniteCheckedException {
+ written = addRow(pageId, page, pageAddr, iox, row, written);
+
+ putPage(((AbstractDataPageIO)iox).getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
+
+ return written;
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param page Page absolute pointer.
+ * @param pageAddr Page address.
+ * @param iox IO.
+ * @param row Row to write.
+ * @param written Written size.
+ * @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Integer addRow(
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ T row,
+ int written)
+ throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int rowSize = row.size();
@@ -156,18 +186,9 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
assert oldFreeSpace > 0 : oldFreeSpace;
// If the full row does not fit into this page write only a fragment.
- written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) :
+ written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, page, pageAddr, io, row, rowSize) :
addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
- // Reread free space after update.
- int newFreeSpace = io.getFreeSpace(pageAddr);
-
- if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
- int bucket = bucket(newFreeSpace, false);
-
- put(null, pageId, page, pageAddr, bucket, statHolder);
- }
-
if (written == rowSize)
evictionTracker.touchPage(pageId);
@@ -185,7 +206,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
* @return Written size which is always equal to row size here.
* @throws IgniteCheckedException If failed.
*/
- private int addRow(
+ protected int addRowFull(
long pageId,
long page,
long pageAddr,
@@ -225,7 +246,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
* @return Updated written size.
* @throws IgniteCheckedException If failed.
*/
- private int addRowFragment(
+ protected int addRowFragment(
long pageId,
long page,
long pageAddr,
@@ -254,6 +275,66 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
return written + payloadSize;
}
+
+ /**
+ * Put page into the free list if needed.
+ *
+ * @param freeSpace Page free space.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param pageAddr Page address.
+ * @param statHolder Statistics holder to track IO operations.
+ */
+ protected void putPage(int freeSpace, long pageId, long page, long pageAddr, IoStatisticsHolder statHolder)
+ throws IgniteCheckedException {
+ if (freeSpace > MIN_PAGE_FREE_SPACE) {
+ int bucket = bucket(freeSpace, false);
+
+ put(null, pageId, page, pageAddr, bucket, statHolder);
+ }
+ }
+ }
+
+ /** */
+ private final class WriteRowsHandler extends PageHandler<GridCursor<T>, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ GridCursor<T> cur,
+ int written,
+ IoStatisticsHolder statHolder)
+ throws IgniteCheckedException {
+ AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+ // Fill the page up to the end.
+ while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) {
+ T row = cur.get();
+
+ if (written == COMPLETE) {
+ // If the data row was completely written without remainder, proceed to the next.
+ if ((written = writeWholePages(row, statHolder)) == COMPLETE)
+ continue;
+
+ if (io.getFreeSpace(pageAddr) < row.size() - written)
+ break;
+ }
+
+ written = writeRowHnd.addRow(pageId, page, pageAddr, io, row, written);
+
+ assert written == COMPLETE;
+
+ evictionTracker.touchPage(pageId);
+ }
+
+ writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
+
+ return written;
+ }
}
/** */
@@ -473,8 +554,6 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
/** {@inheritDoc} */
@Override public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
- int rowSize = row.size();
-
int written = 0;
try {
@@ -482,62 +561,168 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
if (written != 0)
memMetrics.incrementLargeEntriesPages();
- int remaining = rowSize - written;
+ written = writeSinglePage(row, written, statHolder);
+ }
+ while (written != COMPLETE);
+ }
+ catch (IgniteCheckedException | Error e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ throw new CorruptedFreeListException("Failed to insert data row", t);
+ }
+ }
- long pageId = 0L;
+ /**
+ * Reduces the workload on the free list by writing multiple rows into a single memory page at once.<br>
+ * <br>
+ * Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then
+ * those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the
+ * current one.
+ *
+ * @param rows Rows.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public void insertDataRows(Collection<T> rows,
+ IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ try {
+ GridCursor<T> cur = new GridCursorIteratorWrapper<>(rows.iterator());
- if (remaining < MIN_SIZE_FOR_DATA_PAGE) {
- for (int b = bucket(remaining, false) + 1; b < BUCKETS - 1; b++) {
- pageId = takeEmptyPage(b, row.ioVersions(), statHolder);
+ int written = COMPLETE;
- if (pageId != 0L)
- break;
- }
+ while (written != COMPLETE || cur.next()) {
+ T row = cur.get();
+
+ // If eviction is required - free up memory before locking the next page.
+ while (evictionTracker.evictionRequired()) {
+ evictionTracker.evictDataPage();
+
+ memMetrics.updateEvictionRate();
}
- if (pageId == 0L) { // Handle reuse bucket.
- if (reuseList == this)
- pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder);
- else
- pageId = reuseList.takeRecycledPage();
+ if (written == COMPLETE) {
+ written = writeWholePages(row, statHolder);
+
+ continue;
}
AbstractDataPageIO initIo = null;
+ long pageId = takePage(row.size() - written, row, statHolder);
+
if (pageId == 0L) {
pageId = allocateDataPage(row.partition());
initIo = row.ioVersions().latest();
}
- else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket.
- pageId = initReusedPage(row, pageId, row.partition(), statHolder);
- else // Page is taken from free space bucket. For in-memory mode partition must be changed.
- pageId = PageIdUtils.changePartitionId(pageId, (row.partition()));
- written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder);
+ written = write(pageId, writeRowsHnd, initIo, cur, written, FAIL_I, statHolder);
assert written != FAIL_I; // We can't fail here.
}
- while (written != COMPLETE);
}
- catch (IgniteCheckedException | Error e) {
- throw e;
+ catch (RuntimeException e) {
+ throw new CorruptedFreeListException("Failed to insert data rows", e);
}
- catch (Throwable t) {
- throw new CorruptedFreeListException("Failed to insert data row", t);
+ }
+
+ /**
+ * Write fragments of the row, which occupy the whole memory page. A data row is ignored if it is less than the max
+ * payload of an empty data page.
+ *
+ * @param row Row to process.
+ * @param statHolder Statistics holder to track IO operations.
+ * @return Number of bytes written, {@link #COMPLETE} if the row was fully written, {@code 0} if data row was
+ * ignored because it is less than the max payload of an empty data page.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ assert row.link() == 0 : row.link();
+
+ int written = 0;
+ int rowSize = row.size();
+
+ while (rowSize - written >= MIN_SIZE_FOR_DATA_PAGE) {
+ written = writeSinglePage(row, written, statHolder);
+
+ memMetrics.incrementLargeEntriesPages();
}
+
+ return written;
}
/**
+ * Take a page and write row on it.
+ *
+ * @param row Row to write.
+ * @param written Written size.
+ * @param statHolder Statistics holder to track IO operations.
+ * @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int writeSinglePage(T row, int written, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ AbstractDataPageIO initIo = null;
+
+ long pageId = takePage(row.size() - written, row, statHolder);
+
+ if (pageId == 0L) {
+ pageId = allocateDataPage(row.partition());
+
+ initIo = row.ioVersions().latest();
+ }
+
+ written = write(pageId, writeRowHnd, initIo, row, written, FAIL_I, statHolder);
+
+ assert written != FAIL_I; // We can't fail here.
+
+ return written;
+ }
+
+ /**
+ * Take page from free list.
+ *
+ * @param size Required free space on page.
+ * @param row Row to write.
+ * @param statHolder Statistics holder to track IO operations.
+ * @return Page identifier or 0 if no page found in free list.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long takePage(int size, T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ long pageId = 0;
+
+ if (size < MIN_SIZE_FOR_DATA_PAGE) {
+ for (int b = bucket(size, false) + 1; b < REUSE_BUCKET; b++) {
+ pageId = takeEmptyPage(b, row.ioVersions(), statHolder);
+
+ if (pageId != 0L)
+ break;
+ }
+ }
+
+ if (pageId == 0L) { // Handle reuse bucket.
+ pageId = reuseList == this ?
+ takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder) : reuseList.takeRecycledPage();
+ }
+
+ if (pageId == 0L)
+ return 0;
+
+ if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket.
+ return initReusedPage(row, pageId, statHolder);
+ else // Page is taken from free space bucket. For in-memory mode partition must be changed.
+ return PageIdUtils.changePartitionId(pageId, row.partition());
+ }
+
+ /**
+ * @param row Row.
* @param reusedPageId Reused page id.
- * @param partId Partition id.
* @param statHolder Statistics holder to track IO operations.
* @return Prepared page id.
*
* @see PagesList#initReusedPage(long, long, long, int, byte, PageIO)
*/
- private long initReusedPage(T row, long reusedPageId, int partId,
- IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException {
long reusedPage = acquirePage(reusedPageId, statHolder);
try {
long reusedPageAddr = writeLock(reusedPageId, reusedPage);
@@ -546,7 +731,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
try {
return initReusedPage(reusedPageId, reusedPage, reusedPageAddr,
- partId, PageIdAllocator.FLAG_DATA, row.ioVersions().latest());
+ row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest());
}
finally {
writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
index 28f5a50..5914ae52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
@@ -17,24 +17,34 @@
package org.apache.ignite.internal.processors.cache.persistence.freelist;
+import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.processors.cache.persistence.Storable;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
/**
*/
public interface FreeList<T extends Storable> {
/**
* @param row Row.
+ * @param statHolder Statistics holder to track IO operations.
* @throws IgniteCheckedException If failed.
*/
public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException;
/**
+ * @param rows Rows.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void insertDataRows(Collection<T> rows, IoStatisticsHolder statHolder) throws IgniteCheckedException;
+
+ /**
* @param link Row link.
* @param row New row data.
+ * @param statHolder Statistics holder to track IO operations.
* @return {@code True} if was able to update row.
* @throws IgniteCheckedException If failed.
*/
@@ -46,6 +56,7 @@ public interface FreeList<T extends Storable> {
* @param arg Handler argument.
* @param <S> Argument type.
* @param <R> Result type.
+ * @param statHolder Statistics holder to track IO operations.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
@@ -54,6 +65,7 @@ public interface FreeList<T extends Storable> {
/**
* @param link Row link.
+ * @param statHolder Statistics holder to track IO operations.
* @throws IgniteCheckedException If failed.
*/
public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 8d1ab87..f6900b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -711,7 +711,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
- boolean fromStore
+ boolean fromStore,
+ CacheDataRow row
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert false;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index 6364590..eae81c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.processors.database;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -74,6 +76,9 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
private static final long MB = 1024L * 1024L;
/** */
+ private static final int BATCH_SIZE = 100;
+
+ /** */
private PageMemory pageMem;
/** {@inheritDoc} */
@@ -90,6 +95,46 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
* @throws Exception if failed.
*/
@Test
+ public void testInsertDeleteSingleThreaded_batched_1024() throws Exception {
+ checkInsertDeleteSingleThreaded(1024, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteSingleThreaded_batched_2048() throws Exception {
+ checkInsertDeleteSingleThreaded(2048, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteSingleThreaded_batched_4096() throws Exception {
+ checkInsertDeleteSingleThreaded(4096, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteSingleThreaded_batched_8192() throws Exception {
+ checkInsertDeleteSingleThreaded(8192, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteSingleThreaded_batched_16384() throws Exception {
+ checkInsertDeleteSingleThreaded(16384, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
public void testInsertDeleteSingleThreaded_1024() throws Exception {
checkInsertDeleteSingleThreaded(1024);
}
@@ -167,15 +212,64 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteMultiThreaded_batched_1024() throws Exception {
+ checkInsertDeleteMultiThreaded(1024, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteMultiThreaded_batched_2048() throws Exception {
+ checkInsertDeleteMultiThreaded(2048, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteMultiThreaded_batched_4096() throws Exception {
+ checkInsertDeleteMultiThreaded(4096, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteMultiThreaded_batched_8192() throws Exception {
+ checkInsertDeleteMultiThreaded(8192, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testInsertDeleteMultiThreaded_batched_16384() throws Exception {
+ checkInsertDeleteMultiThreaded(16384, true);
+ }
+
+ /**
+ * @param pageSize Page size.
+ * @throws Exception if failed.
+ */
+ protected void checkInsertDeleteMultiThreaded(int pageSize) throws Exception {
+ checkInsertDeleteMultiThreaded(pageSize, false);
+ }
+
+ /**
* @param pageSize Page size.
+ * @param batched Batch mode flag.
* @throws Exception If failed.
*/
- protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception {
- final FreeList list = createFreeList(pageSize);
+ protected void checkInsertDeleteMultiThreaded(final int pageSize, final boolean batched) throws Exception {
+ final FreeList<CacheDataRow> list = createFreeList(pageSize);
Random rnd = new Random();
- final ConcurrentMap<Long, TestDataRow> stored = new ConcurrentHashMap<>();
+ final ConcurrentMap<Long, CacheDataRow> stored = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
int keySize = rnd.nextInt(pageSize * 3 / 2) + 10;
@@ -187,7 +281,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
assertTrue(row.link() != 0L);
- TestDataRow old = stored.put(row.link(), row);
+ CacheDataRow old = stored.put(row.link(), row);
assertNull(old);
}
@@ -196,6 +290,8 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
+ List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE);
+
Random rnd = ThreadLocalRandom.current();
for (int i = 0; i < 200_000; i++) {
@@ -226,25 +322,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
TestDataRow row = new TestDataRow(keySize, valSize);
+ if (batched) {
+ rows.add(row);
+
+ if (rows.size() == BATCH_SIZE) {
+ list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE);
+
+ for (CacheDataRow row0 : rows) {
+ assertTrue(row0.link() != 0L);
+
+ CacheDataRow old = stored.put(row0.link(), row0);
+
+ assertNull(old);
+ }
+
+ rows.clear();
+ }
+
+ continue;
+ }
+
list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE);
assertTrue(row.link() != 0L);
- TestDataRow old = stored.put(row.link(), row);
+ CacheDataRow old = stored.put(row.link(), row);
assertNull(old);
}
else {
- while (true) {
- Iterator<TestDataRow> it = stored.values().iterator();
+ while (!stored.isEmpty()) {
+ Iterator<CacheDataRow> it = stored.values().iterator();
if (it.hasNext()) {
- TestDataRow row = it.next();
+ CacheDataRow row = it.next();
- TestDataRow rmvd = stored.remove(row.link);
+ CacheDataRow rmvd = stored.remove(row.link());
if (rmvd != null) {
- list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE);
+ list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE);
break;
}
@@ -259,14 +375,24 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param pageSize Page size.
* @throws Exception if failed.
*/
protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception {
- FreeList list = createFreeList(pageSize);
+ checkInsertDeleteSingleThreaded(pageSize, false);
+ }
+
+ /**
+ * @param pageSize Page size.
+ * @param batched Batch mode flag.
+ * @throws Exception if failed.
+ */
+ protected void checkInsertDeleteSingleThreaded(int pageSize, boolean batched) throws Exception {
+ FreeList<CacheDataRow> list = createFreeList(pageSize);
Random rnd = new Random();
- Map<Long, TestDataRow> stored = new HashMap<>();
+ Map<Long, CacheDataRow> stored = new HashMap<>();
for (int i = 0; i < 100; i++) {
int keySize = rnd.nextInt(pageSize * 3 / 2) + 10;
@@ -278,13 +404,15 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
assertTrue(row.link() != 0L);
- TestDataRow old = stored.put(row.link(), row);
+ CacheDataRow old = stored.put(row.link(), row);
assertNull(old);
}
boolean grow = true;
+ List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE);
+
for (int i = 0; i < 1_000_000; i++) {
if (grow) {
if (stored.size() > 20_000) {
@@ -309,25 +437,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
TestDataRow row = new TestDataRow(keySize, valSize);
+ if (batched) {
+ rows.add(row);
+
+ if (rows.size() == BATCH_SIZE) {
+ list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE);
+
+ for (CacheDataRow row0 : rows) {
+ assertTrue(row0.link() != 0L);
+
+ CacheDataRow old = stored.put(row0.link(), row0);
+
+ assertNull(old);
+ }
+
+ rows.clear();
+ }
+
+ continue;
+ }
+
list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE);
assertTrue(row.link() != 0L);
- TestDataRow old = stored.put(row.link(), row);
+ CacheDataRow old = stored.put(row.link(), row);
assertNull(old);
}
else {
- Iterator<TestDataRow> it = stored.values().iterator();
+ Iterator<CacheDataRow> it = stored.values().iterator();
if (it.hasNext()) {
- TestDataRow row = it.next();
+ CacheDataRow row = it.next();
- TestDataRow rmvd = stored.remove(row.link);
+ CacheDataRow rmvd = stored.remove(row.link());
assertTrue(rmvd == row);
- list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE);
+ list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE);
}
}
}
@@ -355,7 +503,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
* @return Free list.
* @throws Exception If failed.
*/
- protected FreeList createFreeList(int pageSize) throws Exception {
+ protected FreeList<CacheDataRow> createFreeList(int pageSize) throws Exception {
DataRegionConfiguration plcCfg = new DataRegionConfiguration()
.setInitialSize(1024 * MB)
.setMaxSize(1024 * MB);