You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/04/21 07:41:54 UTC
[ignite] branch ignite-2.8.1 updated: IGNITE-12734 Fixed scan query
over evicted partition - Fixes #7494.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch ignite-2.8.1
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.8.1 by this push:
new faf7f06 IGNITE-12734 Fixed scan query over evicted partition - Fixes #7494.
faf7f06 is described below
commit faf7f06e33b05c029738a992482ff85e0e5ab1f2
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Mar 18 13:12:47 2020 +0300
IGNITE-12734 Fixed scan query over evicted partition - Fixes #7494.
(cherry picked from commit bcaae8deef07344c7615f4ba76c63a3e5e866d6b)
---
.../cache/IgniteCacheOffheapManager.java | 5 +
.../cache/IgniteCacheOffheapManagerImpl.java | 61 ++++++----
.../cache/persistence/GridCacheOffheapManager.java | 10 ++
.../cache/persistence/tree/BPlusTree.java | 8 +-
.../processors/cache/tree/CacheDataTree.java | 2 +
.../cache/query/CacheScanQueryFailoverTest.java | 133 ++++++++++++++++++++-
6 files changed, 190 insertions(+), 29 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 47d2b7b..8c0f43a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1040,6 +1040,11 @@ public interface IgniteCacheOffheapManager {
public void destroy() throws IgniteCheckedException;
/**
+ * Mark store as destroyed.
+ */
+ public void markDestroyed() throws IgniteCheckedException;
+
+ /**
* Clears all the records associated with logical cache with given ID.
*
* @param cacheId Cache ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d208578..c8abe69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -926,40 +926,46 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return true;
while (true) {
- if (cur == null) {
- if (dataIt.hasNext()) {
- CacheDataStore ds = dataIt.next();
+ try {
+ if (cur == null) {
+ if (dataIt.hasNext()) {
+ CacheDataStore ds = dataIt.next();
- curPart = ds.partId();
+ curPart = ds.partId();
- // Data page scan is disabled by default for scan queries.
- // TODO https://issues.apache.org/jira/browse/IGNITE-11998
- CacheDataTree.setDataPageScanEnabled(false);
+ // Data page scan is disabled by default for scan queries.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-11998
+ CacheDataTree.setDataPageScanEnabled(false);
- try {
- if (mvccSnapshot == null)
- cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
- else {
- cur = cacheId == CU.UNDEFINED_CACHE_ID ?
- ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+ try {
+ if (mvccSnapshot == null)
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
+ else {
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ?
+ ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+ }
+ }
+ finally {
+ CacheDataTree.setDataPageScanEnabled(false);
}
}
- finally {
- CacheDataTree.setDataPageScanEnabled(false);
- }
+ else
+ break;
}
- else
- break;
- }
- if (cur.next()) {
- next = cur.get();
- next.key().partition(curPart);
+ if (cur.next()) {
+ next = cur.get();
+ next.key().partition(curPart);
- break;
+ break;
+ }
+ else
+ cur = null;
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteCheckedException("Failed to get next data row due to underlying cursor " +
+ "invalidation", ex);
}
- else
- cur = null;
}
return next != null;
@@ -2946,6 +2952,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public void markDestroyed() {
+ dataTree.markDestroyed();
+ }
+
+ /** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
assert cacheId != CU.UNDEFINED_CACHE_ID;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cd43d88..a255413 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -852,6 +852,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
ctx.database().checkpointReadUnlock();
}
+ store.markDestroyed();
+
((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId);
}
@@ -2575,6 +2577,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public void markDestroyed() throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ delegate.markDestroyed();
+ }
+
+ /** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index fc14a5b..833ec04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1006,9 +1006,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* Check if the tree is getting destroyed.
*/
- protected final void checkDestroyed() {
+ protected final void checkDestroyed() throws IgniteCheckedException {
if (destroyed.get())
- throw new IllegalStateException(CONC_DESTROY_MSG + getName());
+ throw new IgniteCheckedException(CONC_DESTROY_MSG + getName());
}
/** {@inheritDoc} */
@@ -2487,7 +2487,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @return {@code True} if state was changed.
*/
- private boolean markDestroyed() {
+ public boolean markDestroyed() {
return destroyed.compareAndSet(false, true);
}
@@ -5404,6 +5404,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @throws IgniteCheckedException If failed.
*/
final boolean nextPage(L lastRow) throws IgniteCheckedException {
+ checkDestroyed();
+
updateLowerBound(lastRow);
for (;;) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a8f9c51..a617621 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -217,6 +217,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
* @throws IgniteCheckedException If failed.
*/
private boolean readNextDataPage() throws IgniteCheckedException {
+ checkDestroyed();
+
for (;;) {
if (++curPage >= pagesCnt) {
// Reread number of pages when we reach it (it may grow).
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
index 488ce18..8a0ebb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
@@ -17,17 +17,30 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.testframework.GridTestUtils;
@@ -36,6 +49,7 @@ import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
/**
* ScanQuery failover test. Tests scenario where user supplied closures throw unhandled errors.
@@ -65,7 +79,8 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected boolean isRemoteJvm(String igniteInstanceName) {
- if(igniteInstanceName.equals("client") || igniteInstanceName.equals("server"))
+ if(igniteInstanceName.equals("client") || igniteInstanceName.equals("server")
+ || igniteInstanceName.startsWith("grid"))
return false;
else
return super.isRemoteJvm(igniteInstanceName);
@@ -78,6 +93,8 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
if (name.equals("client"))
cfg.setClientMode(true);
+ cfg.setConsistentId(name);
+
cfg.setFailureHandler(new StopNodeOrHaltFailureHandler());
return cfg;
@@ -115,6 +132,73 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
}
/**
+ * Test scan query when partitions are concurrently evicting.
+ */
+ @Test
+ public void testScanQueryOnEvictedPartition() throws Exception {
+ cleanPersistenceDir();
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true));
+
+ IgniteEx grid0 = startGrid(getConfiguration("grid0").setDataStorageConfiguration(dsCfg));
+
+ grid0.cluster().active(true);
+
+ IgniteCache<Integer, Integer> cache1 = grid0.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>("cache1")
+ .setAffinity(new RoundRobinAffinityFunction(2))
+ );
+
+ IgniteCache<Integer, Integer> cache2 = grid0.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>("cache2")
+ .setAffinity(new RoundRobinAffinityFunction(2))
+ );
+
+ cache1.put(0, 0); // Put to partition 0.
+ cache1.put(1, 1); // Put to partition 1.
+
+ cache2.put(0, 0); // Put to partition 0.
+
+ for (int i = 1; i < 1_000; i += 2)
+ cache2.put(i, i); // Put to partition 1.
+
+ Iterator iter1 = cache1.query(new ScanQuery<>().setPageSize(1)).iterator();
+ Iterator iter2 = cache1.query(new ScanQuery<>().setPageSize(1)).iterator();
+
+ // Iter 1 check case, when cursor is switched to evicted partition.
+ iter1.next();
+
+ // Iter 2 check case, when cursor already moving by partition and this partition is evicted.
+ iter2.next();
+ iter2.next();
+
+ startGrid(getConfiguration("grid1").setDataStorageConfiguration(dsCfg));
+
+ grid0.cluster().setBaselineTopology(grid0.cluster().topologyVersion());
+
+ // Wait for rebalance and evition of partition 1 to grid 1 for each cache.
+ awaitPartitionMapExchange();
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ grid0.cachex("cache1").context().topology().localPartition(1).state() == EVICTED &&
+ grid0.cachex("cache2").context().topology().localPartition(1).state() == EVICTED,
+ 1_000L));
+
+ // Force checkpoint to destroy evicted partitions store.
+ forceCheckpoint(grid0);
+
+ GridTestUtils.assertThrowsAnyCause(log, iter1::next, IgniteException.class, "Failed to get next data row");
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> {
+ while (iter2.hasNext())
+ iter2.next();
+
+ return null;
+ }, IgniteException.class, "Failed to get next data row");
+ }
+
+ /**
* @param ignite Ignite instance.
* @param configs Cache configurations.
*/
@@ -174,4 +258,51 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
integerBinaryObjectEntry -> {
throw new Error("Poison pill");
};
+
+ /**
+ * Affinity function to distribute partitions by round robin to each node.
+ */
+ private static class RoundRobinAffinityFunction implements AffinityFunction {
+ /** Partitions count. */
+ private final int partitions;
+
+ /**
+ * @param partitions Partitions count.
+ */
+ public RoundRobinAffinityFunction(int partitions) {
+ this.partitions = partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return key.hashCode() % partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> res = new ArrayList<>(partitions);
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+ nodes.sort(NodeOrderComparator.getInstance());
+
+ for (int i = 0; i < partitions; i++)
+ res.add(Collections.singletonList(nodes.get(i % nodes.size())));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+ }
}