You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 06:08:53 UTC
[02/28] ignite git commit: IGNITE-8873 Added methods to preload
partitions to page memory - Fixes #5053.
IGNITE-8873 Added methods to preload partitions to page memory - Fixes #5053.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28e3dec5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28e3dec5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28e3dec5
Branch: refs/heads/ignite-627
Commit: 28e3dec5b4c9bffcca5391cac8bee824973fc7a4
Parents: 51d8c9f
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Fri Oct 26 13:00:22 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Oct 26 13:01:10 2018 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 49 +-
.../cache/GatewayProtectedCacheProxy.java | 36 ++
.../processors/cache/GridCacheAdapter.java | 132 +++++
.../processors/cache/GridCacheProxyImpl.java | 36 ++
.../cache/IgniteCacheOffheapManager.java | 20 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 10 +
.../processors/cache/IgniteCacheProxyImpl.java | 30 ++
.../processors/cache/IgniteInternalCache.java | 23 +
.../topology/GridDhtPartitionsReservation.java | 2 +-
.../processors/cache/local/GridLocalCache.java | 23 +
.../persistence/GridCacheOffheapManager.java | 49 ++
.../db/IgnitePdsPartitionPreloadTest.java | 495 +++++++++++++++++++
.../multijvm/IgniteCacheProcessProxy.java | 15 +
.../ignite/testsuites/IgnitePdsTestSuite4.java | 3 +
.../cache/hibernate/HibernateCacheProxy.java | 15 +
.../ApiParity/CacheParityTest.cs | 5 +-
16 files changed, 937 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 8479420..70ee0d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -1516,7 +1516,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
/**
* Gets a collection of lost partition IDs.
*
- * @return Lost paritions.
+ * @return Lost partitions.
*/
public Collection<Integer> lostPartitions();
@@ -1531,4 +1531,51 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* Clear cluster statistics for this cache.
*/
public void clearStatistics();
+
+ /**
+ * Efficiently preloads cache primary partition into page memory.
+ * <p>
+ * This is useful for fast iteration over cache partition data if persistence is enabled and the data is "cold".
+ * <p>
+ * Preload will reduce available amount of page memory for subsequent operations and may lead to earlier page
+ * replacement.
+ * <p>
+ * This method is irrelevant for in-memory caches. Calling this method on an in-memory cache will result in
+ * exception.
+ *
+ * @param partition Partition.
+ */
+ public void preloadPartition(int partition);
+
+ /**
+ * Efficiently preloads cache partition into page memory.
+ * <p>
+ * This is useful for fast iteration over cache partition data if persistence is enabled and the data is "cold".
+ * <p>
+ * Preload will reduce available amount of page memory for subsequent operations and may lead to earlier page
+ * replacement.
+ * <p>
+ * This method is irrelevant for in-memory caches. Calling this method on an in-memory cache will result in
+ * exception.
+ *
+ * @param partition Partition.
+ * @return A future representing pending completion of the partition preloading.
+ */
+ public IgniteFuture<Void> preloadPartitionAsync(int partition);
+
+ /**
+ * Efficiently preloads cache partition into page memory if it exists on the local node.
+ * <p>
+ * This is useful for fast iteration over cache partition data if persistence is enabled and the data is "cold".
+ * <p>
+ * Preload will reduce available amount of page memory for subsequent operations and may lead to earlier page
+ * replacement.
+ * <p>
+ * This method is irrelevant for in-memory caches. Calling this method on an in-memory cache will result in
+ * exception.
+ *
+ * @param partition Partition.
+ * @return {@code True} if partition was preloaded, {@code false} if it doesn't belong to local node.
+ */
+ public boolean localPreloadPartition(int partition);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index c99eb00..0735a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -1494,6 +1494,42 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
}
}
+ /** {@inheritDoc} */
+ @Override public void preloadPartition(int part) {
+ CacheOperationGate opGate = onEnter();
+
+ try {
+ delegate.preloadPartition(part);
+ }
+ finally {
+ onLeave(opGate);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> preloadPartitionAsync(int part) {
+ CacheOperationGate opGate = onEnter();
+
+ try {
+ return delegate.preloadPartitionAsync(part);
+ }
+ finally {
+ onLeave(opGate);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) {
+ CacheOperationGate opGate = onEnter();
+
+ try {
+ return delegate.localPreloadPartition(part);
+ }
+ finally {
+ onLeave(opGate);
+ }
+ }
+
/**
* Safely get CacheGateway.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9574d49..7cba419 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -90,6 +90,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -136,10 +138,13 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -149,6 +154,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_KEY_VALIDATI
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.cache.CacheOperationContext.DFLT_ALLOW_ATOMIC_OPS_IN_TX;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
@@ -179,6 +185,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Maximum number of retries when topology changes. */
public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
+ /** Minimum version supporting partition preloading. */
+ private static final IgniteProductVersion PRELOAD_PARTITION_SINCE = IgniteProductVersion.fromString("2.7.0");
+
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
String>>() {
@@ -284,6 +293,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
+ /** {@code True} if attempted to use partition preloading on outdated node. */
+ private volatile boolean partPreloadBadVerWarned;
+
/** Active. */
private volatile boolean active;
@@ -1262,6 +1274,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * @param part Partition id.
+ * @return Future.
+ */
+ private IgniteInternalFuture<?> executePreloadTask(int part) throws IgniteCheckedException {
+ ClusterGroup grp = ctx.grid().cluster().forDataNodes(ctx.name());
+
+ @Nullable ClusterNode targetNode = ctx.affinity().primaryByPartition(part, ctx.topology().readyTopologyVersion());
+
+ if (targetNode == null || targetNode.version().compareTo(PRELOAD_PARTITION_SINCE) < 0) {
+ if (!partPreloadBadVerWarned) {
+ U.warn(log(), "Attempting to execute partition preloading task on outdated or not mapped node " +
+ "[targetNodeVer=" + (targetNode == null ? "NA" : targetNode.version()) +
+ ", minSupportedNodeVer=" + PRELOAD_PARTITION_SINCE + ']');
+
+ partPreloadBadVerWarned = true;
+ }
+
+ return new GridFinishedFuture<>();
+ }
+
+ return ctx.closures().affinityRun(Collections.singleton(name()), part,
+ new PartitionPreloadJob(ctx.name(), part), grp.nodes(), null);
+ }
+
+ /**
* @param keys Keys.
* @param readers Readers flag.
*/
@@ -4961,6 +4998,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new CacheEntryImpl<>((K)key0, (V)val0, entry.version());
}
+ /** {@inheritDoc} */
+ @Override public void preloadPartition(int part) throws IgniteCheckedException {
+ if (isLocal())
+ ctx.offheap().preloadPartition(part);
+ else
+ executePreloadTask(part).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
+ if (isLocal()) {
+ return ctx.kernalContext().closure().runLocalSafe(() -> {
+ try {
+ ctx.offheap().preloadPartition(part);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ });
+ }
+ else
+ return executePreloadTask(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) throws IgniteCheckedException {
+ if (!ctx.affinityNode())
+ return false;
+
+ GridDhtPartitionTopology top = ctx.group().topology();
+
+ @Nullable GridDhtLocalPartition p = top.localPartition(part, top.readyTopologyVersion(), false);
+
+ if (p == null)
+ return false;
+
+ try {
+ if (!p.reserve() || p.state() != OWNING)
+ return false;
+
+ p.dataStore().preload();
+ }
+ finally {
+ p.release();
+ }
+
+ return true;
+ }
+
/**
*
*/
@@ -6692,6 +6778,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Partition preload job.
+ */
+ @GridInternal
+ private static class PartitionPreloadJob implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private final String name;
+
+ /** Cache name. */
+ private final int part;
+
+ /**
+ * @param name Name.
+ * @param part Partition.
+ */
+ public PartitionPreloadJob(String name, int part) {
+ this.name = name;
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteInternalCache cache = ignite.context().cache().cache(name);
+
+ try {
+ cache.context().offheap().preloadPartition(part);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to preload the partition [cache=" + name + ", partition=" + part + ']', e);
+
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
* Iterator implementation for KeySet.
*/
private final class KeySetIterator implements Iterator<K> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index ebb8b5e..6b3ca56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -239,6 +239,42 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public void preloadPartition(int part) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ delegate.preloadPartition(part);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.preloadPartitionAsync(part);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.localPreloadPartition(part);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
return new GridCacheProxyImpl<>(ctx, delegate,
opCtx != null ? opCtx.forSubjectId(subjId) :
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index e9ec025..66d927f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -590,6 +590,14 @@ public interface IgniteCacheOffheapManager {
public long totalPartitionEntriesCount(int part);
/**
+ * Preload a partition. Must be called under partition reservation for DHT caches.
+ *
+ * @param part Partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void preloadPartition(int part) throws IgniteCheckedException;
+
+ /**
*
*/
interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> {
@@ -1054,7 +1062,7 @@ public interface IgniteCacheOffheapManager {
/**
* @param cntr Counter.
*/
- void updateInitialCounter(long cntr);
+ public void updateInitialCounter(long cntr);
/**
* Inject rows cache cleaner.
@@ -1068,11 +1076,17 @@ public interface IgniteCacheOffheapManager {
*
* @return PendingTree instance.
*/
- PendingEntriesTree pendingTree();
+ public PendingEntriesTree pendingTree();
/**
* Flushes pending update counters closing all possible gaps.
*/
- void finalizeUpdateCountres();
+ public void finalizeUpdateCountres();
+
+ /**
+ * Preload a store into page memory.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void preload() throws IgniteCheckedException;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e547784..c450d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -336,6 +336,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
}
+ /** {@inheritDoc} */
+ @Override public void preloadPartition(int p) throws IgniteCheckedException {
+ throw new IgniteCheckedException("Operation only applicable to caches with enabled persistence");
+ }
+
/**
* @param p Partition.
* @return Partition data.
@@ -2906,6 +2911,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return pendingEntries;
}
+ /** {@inheritDoc} */
+ @Override public void preload() throws IgniteCheckedException {
+ // No-op.
+ }
+
/**
* @param cctx Cache context.
* @param key Key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 776e1cb..addd189 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -1827,6 +1827,36 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
}
/** {@inheritDoc} */
+ @Override public void preloadPartition(int part) {
+ try {
+ delegate.preloadPartition(part);
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> preloadPartitionAsync(int part) {
+ try {
+ return (IgniteFuture<Void>)createFuture(delegate.preloadPartitionAsync(part));
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) {
+ try {
+ return delegate.localPreloadPartition(part);
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index cba2228..9d40190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1818,4 +1818,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @return A collection of lost partitions if a cache is in recovery state.
*/
public Collection<Integer> lostPartitions();
+
+ /**
+ * Preload cache partition.
+ * @param part Partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void preloadPartition(int part) throws IgniteCheckedException;
+
+ /**
+ * Preload cache partition.
+ * @param part Partition.
+ * @return Future to be completed whenever preloading completes.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException;
+
+ /**
+ * Preloads cache partition if it exists on local node.
+ * @param part Partition.
+ * @return {@code True} if partition was preloaded, {@code false} if it doesn't belong to local node.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean localPreloadPartition(int part) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsReservation.java
index 2682a89..5017486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsReservation.java
@@ -179,7 +179,7 @@ public class GridDhtPartitionsReservation implements GridReservable {
*/
private static void tryEvict(GridDhtLocalPartition[] parts) {
if (parts == null) // Can be not initialized yet.
- return ;
+ return;
for (GridDhtLocalPartition part : parts)
tryEvict(part);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 7b7ac66..481a6cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -232,4 +232,27 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
@Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
return localSizeLong(peekModes);
}
+
+ /** {@inheritDoc} */
+ @Override public void preloadPartition(int part) throws IgniteCheckedException {
+ ctx.offheap().preloadPartition(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
+ return ctx.closures().callLocalSafe(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ preloadPartition(part);
+
+ return null;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) throws IgniteCheckedException {
+ ctx.offheap().preloadPartition(part);
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 240fbbe..5f6511d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageSupport;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
@@ -869,6 +870,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return size;
}
+ /** {@inheritDoc} */
+ @Override public void preloadPartition(int part) throws IgniteCheckedException {
+ if (grp.isLocal()) {
+ dataStore(part).preload();
+
+ return;
+ }
+
+ GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false, false);
+
+ assert locPart != null && locPart.reservations() > 0;
+
+ locPart.dataStore().preload();
+ }
+
/**
* Calculates free space of all partition data stores - number of bytes available for use in allocated pages.
*
@@ -1388,6 +1404,31 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public PendingEntriesTree pendingTree() {
return pendingTree0;
}
+
+ /** {@inheritDoc} */
+ @Override public void preload() throws IgniteCheckedException {
+ IgnitePageStoreManager pageStoreMgr = ctx.pageStore();
+
+ if (pageStoreMgr == null)
+ return;
+
+ final int pages = pageStoreMgr.pages(grp.groupId(), partId);
+
+ long pageId = pageMem.partitionMetaPageId(grp.groupId(), partId);
+
+ // For each page sequentially pin/unpin.
+ for (int pageNo = 0; pageNo < pages; pageId++, pageNo++) {
+ long pagePointer = -1;
+
+ try {
+ pagePointer = pageMem.acquirePage(grp.groupId(), pageId);
+ }
+ finally {
+ if (pagePointer != -1)
+ pageMem.releasePage(grp.groupId(), pageId, pagePointer);
+ }
+ }
+ }
};
pendingTree = pendingTree0;
@@ -2245,6 +2286,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
throw new IgniteException(e);
}
}
+
+ /** {@inheritDoc} */
+ @Override public void preload() throws IgniteCheckedException {
+ CacheDataStore delegate0 = init0(true);
+
+ if (delegate0 != null)
+ delegate0.preload();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPartitionPreloadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPartitionPreloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPartitionPreloadTest.java
new file mode 100644
index 0000000..b9d28ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsPartitionPreloadTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ * Test partition preload for varios cache modes.
+ */
+public class IgnitePdsPartitionPreloadTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Test entry count. */
+ public static final int ENTRY_CNT = 500;
+
+ /** Grid count. */
+ private static final int GRIDS_CNT = 3;
+
+ /** */
+ private static final String CLIENT_GRID_NAME = "client";
+
+ /** */
+ public static final String DEFAULT_REGION = "default";
+
+ /** */
+ private Supplier<CacheConfiguration> cfgFactory;
+
+ /** */
+ private static final String TEST_ATTR = "testId";
+
+ /** */
+ private static final String NO_CACHE_NODE = "node0";
+
+ /** */
+ private static final String PRIMARY_NODE = "node1";
+
+ /** */
+ private static final String BACKUP_NODE = "node2";
+
+ /** */
+ public static final String MEM = "mem";
+
+ /** */
+ public static final int MB = 1024 * 1024;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(CLIENT_GRID_NAME.equals(gridName));
+
+ if (!cfg.isClientMode()) {
+ String val = "node" + getTestIgniteInstanceIndex(gridName);
+ cfg.setUserAttributes(Collections.singletonMap(TEST_ATTR, val));
+ cfg.setConsistentId(val);
+ }
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDataRegionConfigurations(new DataRegionConfiguration().setName(MEM).setInitialSize(10 * MB))
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().
+ setMetricsEnabled(true).
+ setMaxSize(50L * MB).
+ setPersistenceEnabled(true).
+ setName(DEFAULT_REGION))
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalSegmentSize(16 * MB)
+ .setPageSize(1024)
+ .setMetricsEnabled(true);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ cfg.setCacheConfiguration(cfgFactory.get());
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setBackups(1);
+ ccfg.setNodeFilter(new TestIgnitePredicate());
+ ccfg.setAtomicityMode(atomicityMode);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ public void testLocalPreloadPartitionClient() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL).setDataRegionName(MEM);
+
+ startGridsMultiThreaded(GRIDS_CNT);
+
+ IgniteEx client = startGrid("client");
+
+ assertNotNull(client.cache(DEFAULT_CACHE_NAME));
+
+ assertFalse(client.cache(DEFAULT_CACHE_NAME).localPreloadPartition(0));
+ assertFalse(grid(0).cache(DEFAULT_CACHE_NAME).localPreloadPartition(0));
+ }
+
+ /** */
+ public void testLocalPreloadPartitionPrimary() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.LOCAL);
+ }
+
+ /** */
+ public void testLocalPreloadPartitionBackup() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(BackupNodePredicate.INSTANCE).findFirst().get(), PreloadMode.LOCAL);
+ }
+
+ /** */
+ public void testPreloadPartitionInMemoryRemote() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL).setDataRegionName(MEM);
+
+ startGridsMultiThreaded(GRIDS_CNT);
+
+ IgniteEx client = startGrid("client");
+
+ assertNotNull(client.cache(DEFAULT_CACHE_NAME));
+
+ try {
+ client.cache(DEFAULT_CACHE_NAME).preloadPartition(0);
+
+ fail("Exception is expected");
+ }
+ catch (Exception e) {
+ log.error("Expected", e);
+ }
+ }
+
+ /** */
+ public void testPreloadPartitionInMemoryLocal() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL).setDataRegionName(MEM);
+
+ startGridsMultiThreaded(GRIDS_CNT);
+
+ int key = 0;
+
+ Ignite prim = primaryNode(key, DEFAULT_CACHE_NAME);
+
+ int part = prim.affinity(DEFAULT_CACHE_NAME).partition(key);
+
+ try {
+ prim.cache(DEFAULT_CACHE_NAME).preloadPartition(part);
+
+ fail("Exception is expected");
+ }
+ catch (Exception e) {
+ log.error("Expected", e);
+ }
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalClientSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(() -> {
+ try {
+ return startGrid(CLIENT_GRID_NAME);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalClientAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(() -> {
+ try {
+ return startGrid(CLIENT_GRID_NAME);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalNodeFilteredSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(() -> grid(0), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalNodeFilteredAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(() -> grid(0), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalPrimarySync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalPrimaryAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalBackupSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(BackupNodePredicate.INSTANCE).findFirst().get(), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionTransactionalBackupAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(BackupNodePredicate.INSTANCE).findFirst().get(), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicClientSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(() -> {
+ try {
+ return startGrid(CLIENT_GRID_NAME);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicClientAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(() -> {
+ try {
+ return startGrid(CLIENT_GRID_NAME);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicNodeFilteredSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(() -> grid(0), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicNodeFilteredAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(() -> grid(0), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicPrimarySync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicPrimaryAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicBackupSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(BackupNodePredicate.INSTANCE).findFirst().get(), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadPartitionAtomicBackupAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(ATOMIC);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(BackupNodePredicate.INSTANCE).findFirst().get(), PreloadMode.ASYNC);
+ }
+
+ /** */
+ public void testPreloadLocalTransactionalSync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL).setCacheMode(LOCAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.SYNC);
+ }
+
+ /** */
+ public void testPreloadLocalTransactionalAsync() throws Exception {
+ cfgFactory = () -> cacheConfiguration(TRANSACTIONAL).setCacheMode(LOCAL);
+
+ preloadPartition(
+ () -> G.allGrids().stream().filter(PrimaryNodePredicate.INSTANCE).findFirst().get(), PreloadMode.ASYNC);
+ }
+
+ /**
+ * @param testNodeFactory Test node factory.
+ * @param preloadMode Preload mode.
+ */
+ private void preloadPartition(Supplier<Ignite> testNodeFactory, PreloadMode preloadMode) throws Exception {
+ Ignite crd = startGridsMultiThreaded(GRIDS_CNT);
+
+ int cnt = 0;
+
+ Ignite primary = grid(1);
+
+ assertEquals(PRIMARY_NODE, primary.cluster().localNode().consistentId());
+
+ Integer key = primaryKey(primary.cache(DEFAULT_CACHE_NAME));
+
+ int preloadPart = crd.affinity(DEFAULT_CACHE_NAME).partition(key);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer = primary.dataStreamer(DEFAULT_CACHE_NAME)) {
+ int k = 0;
+
+ while (cnt < ENTRY_CNT) {
+ if (primary.affinity(DEFAULT_CACHE_NAME).partition(k) == preloadPart) {
+ streamer.addData(k, k);
+
+ cnt++;
+ }
+
+ k++;
+ }
+ }
+
+ forceCheckpoint();
+
+ stopAllGrids();
+
+ startGridsMultiThreaded(GRIDS_CNT);
+
+ primary = G.allGrids().stream().
+ filter(ignite -> PRIMARY_NODE.equals(ignite.cluster().localNode().consistentId())).findFirst().get();
+
+ assertEquals(primary, primaryNode(key, DEFAULT_CACHE_NAME));
+
+ Ignite testNode = testNodeFactory.get();
+
+ switch (preloadMode) {
+ case SYNC:
+ testNode.cache(DEFAULT_CACHE_NAME).preloadPartition(preloadPart);
+
+ break;
+ case ASYNC:
+ testNode.cache(DEFAULT_CACHE_NAME).preloadPartitionAsync(preloadPart).get();
+
+ break;
+ case LOCAL:
+ assertTrue(testNode.cache(DEFAULT_CACHE_NAME).localPreloadPartition(preloadPart));
+
+ break;
+ }
+
+ long c0 = primary.dataRegionMetrics(DEFAULT_REGION).getPagesRead();
+
+ // After partition preloading no pages should be read from store.
+ List<Cache.Entry<Object, Object>> list = U.arrayList(testNode.cache(DEFAULT_CACHE_NAME).localEntries(), 1000);
+
+ assertEquals(ENTRY_CNT, list.size());
+
+ assertEquals("Read pages count must be same", c0, primary.dataRegionMetrics(DEFAULT_REGION).getPagesRead());
+ }
+
+ /** */
+ private static class TestIgnitePredicate implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return !NO_CACHE_NODE.equals(node.attribute(TEST_ATTR));
+ }
+ }
+
+ /** */
+ private static class PrimaryNodePredicate implements Predicate<Ignite> {
+ /** */
+ private static final PrimaryNodePredicate INSTANCE = new PrimaryNodePredicate();
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Ignite ignite) {
+ return PRIMARY_NODE.equals(ignite.cluster().localNode().consistentId());
+ }
+ }
+
+ /** */
+ private static class BackupNodePredicate implements Predicate<Ignite> {
+ /** */
+ private static final BackupNodePredicate INSTANCE = new BackupNodePredicate();
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Ignite ignite) {
+ return BACKUP_NODE.equals(ignite.cluster().localNode().consistentId());
+ }
+ }
+
+ /** */
+ private enum PreloadMode {
+ /** Sync. */ SYNC,
+ /** Async. */ASYNC,
+ /** Local. */LOCAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 681d904..18a2f7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -688,6 +688,21 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
+ @Override public void preloadPartition(int partId) {
+ throw new UnsupportedOperationException("Method should be supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> preloadPartitionAsync(int partId) {
+ throw new UnsupportedOperationException("Method should be supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int partition) {
+ throw new UnsupportedOperationException("Method should be supported.");
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withAllowAtomicOpsInTx() {
return this;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index c164635..64615e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuo
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
import org.apache.ignite.internal.processors.cache.persistence.file.FileDownloaderTest;
@@ -44,6 +45,8 @@ public class IgnitePdsTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse.class);
+ suite.addTestSuite(IgnitePdsPartitionPreloadTest.class);
+
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java
index cdccc3e..527b767 100644
--- a/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java
+++ b/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java
@@ -647,6 +647,21 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object>
}
/** {@inheritDoc} */
+ @Override public void preloadPartition(int part) throws IgniteCheckedException {
+ delegate.preloadPartition(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
+ return delegate.preloadPartitionAsync(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean localPreloadPartition(int part) throws IgniteCheckedException {
+ return delegate.localPreloadPartition(part);
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public EntryProcessorResult invoke(
@Nullable AffinityTopologyVersion topVer,
Object key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/28e3dec5/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheParityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheParityTest.cs
index 7548740..d0a103f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheParityTest.cs
@@ -57,7 +57,10 @@ namespace Apache.Ignite.Core.Tests.ApiParity
"sizeLongAsync", // IGNITE-6563
"localSizeLong", // IGNITE-6563
"enableStatistics", // IGNITE-7276
- "clearStatistics" // IGNITE-9017
+ "clearStatistics", // IGNITE-9017
+ "preloadPartition", // IGNITE-9998
+ "preloadPartitionAsync", // IGNITE-9998
+ "localPreloadPartition", // IGNITE-9998
};
/// <summary>