You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/05/17 18:42:14 UTC
[1/2] ignite git commit: IGNITE-7896 FilePageStore truncate now
actually remove redundant partition page file.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.5 df34d192d -> 247ab2e3d
IGNITE-7896 FilePageStore truncate now actually remove redundant partition page file.
Signed-off-by: Andrey Gura <ag...@apache.org>
(cherry picked from commit d154eec)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a722bbc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a722bbc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a722bbc
Branch: refs/heads/ignite-2.5
Commit: 4a722bbc57cf997b58a88778f32180353378d87c
Parents: df34d19
Author: Ivan Daschinskiy <iv...@gmail.com>
Authored: Thu May 10 20:00:12 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu May 17 21:07:41 2018 +0300
----------------------------------------------------------------------
.../cache/persistence/file/FilePageStore.java | 23 +--
.../IgnitePdsPartitionFilesTruncateTest.java | 153 +++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
3 files changed, 170 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a722bbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index fa02f5d..05f9421 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -260,26 +260,31 @@ public class FilePageStore implements PageStore {
public void truncate(int tag) throws PersistentStorageIOException {
lock.writeLock().lock();
+ long pages = this.pages();
+
try {
if (!inited)
return;
this.tag = tag;
- fileIO.clear();
-
- long newAlloc = initFile();
-
- long delta = newAlloc - allocated.getAndSet(newAlloc);
-
- assert delta % pageSize == 0;
-
- allocatedTracker.updateTotalAllocatedPages(delta / pageSize);
+ try {
+ fileIO.close();
+ }
+ finally {
+ cfgFile.delete();
+ }
}
catch (IOException e) {
throw new PersistentStorageIOException(e);
}
finally {
+ inited = false;
+
+ allocated.set(0);
+
+ allocatedTracker.updateTotalAllocatedPages(-1L * pages);
+
lock.writeLock().unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a722bbc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
new file mode 100644
index 0000000..78c2453
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that evicted partitions doesn't leave files in PDS.
+ */
+public class IgnitePdsPartitionFilesTruncateTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setPageSize(1024)
+ .setWalMode(WALMode.LOG_ONLY)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)))
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTruncatingPartitionFilesOnEviction() throws Exception {
+ Ignite ignite0 = startGrids(3);
+
+ ignite0.cluster().active(true);
+
+ try (IgniteDataStreamer<Integer,String> streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < 1_000; i++)
+ streamer.addData(i, "Value " + i);
+ }
+
+ assertEquals(1, ignite0.cacheNames().size());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartFiles(0);
+ checkPartFiles(1);
+ checkPartFiles(2);
+
+ stopGrid(2);
+
+ ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartFiles(0);
+ checkPartFiles(1);
+
+ startGrid(2);
+
+ ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartFiles(0);
+ checkPartFiles(1);
+ checkPartFiles(2);
+ }
+
+ /**
+ * @param idx Node index.
+ */
+ private void checkPartFiles(int idx) throws Exception {
+ Ignite ignite = grid(idx);
+
+ int[] parts = ignite.affinity(DEFAULT_CACHE_NAME).allPartitions(ignite.cluster().localNode());
+
+ Path dirPath = Paths.get(U.defaultWorkDirectory(), "db",
+ U.maskForFileName(ignite.configuration().getIgniteInstanceName()), "cache-" + DEFAULT_CACHE_NAME);
+
+ info("Path: " + dirPath.toString());
+
+ assertTrue(Files.exists(dirPath));
+
+ for (Path f : Files.newDirectoryStream(dirPath)) {
+ if (f.getFileName().toString().startsWith("part-"))
+ assertTrue("Node_" + idx +" should contains only partitions " + Arrays.toString(parts)
+ + ", but the file is redundant: " + f.getFileName(), anyMatch(parts, f));
+ }
+ }
+
+ /** */
+ private boolean anyMatch(int[] parts, Path f) {
+ Pattern ptrn = Pattern.compile("part-(\\d+).bin");
+ Matcher matcher = ptrn.matcher(f.getFileName().toString());
+
+ if (!matcher.find())
+ throw new IllegalArgumentException("File is not a partition:" + f.getFileName());
+
+ int part = Integer.parseInt(matcher.group(1));
+
+ for (int p: parts) {
+ if (p == part)
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a722bbc/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 6d953cd..43de84e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuo
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesTruncateTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest;
@@ -100,6 +101,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
public static void addRealPageStoreTests(TestSuite suite) {
suite.addTestSuite(IgnitePdsPageSizesTest.class);
+ suite.addTestSuite(IgnitePdsPartitionFilesTruncateTest.class);
+
// Metrics test.
suite.addTestSuite(IgniteDataStorageMetricsSelfTest.class);
[2/2] ignite git commit: IGNITE-8320 Partition file can be truncated
only after checkpoint - Fixes #3985.
Posted by ag...@apache.org.
IGNITE-8320 Partition file can be truncated only after checkpoint - Fixes #3985.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
(cherry picked from commit 8cb35e1)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/247ab2e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/247ab2e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/247ab2e3
Branch: refs/heads/ignite-2.5
Commit: 247ab2e3d9270ccf6b7ebc75af4501d3dd9aeefa
Parents: 4a722bb
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Thu May 17 12:19:36 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu May 17 21:26:53 2018 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 55 ++-
.../GridCacheDatabaseSharedManager.java | 383 ++++++++++++++--
.../persistence/GridCacheOffheapManager.java | 44 +-
.../cache/persistence/file/FilePageStore.java | 43 +-
.../wal/serializer/RecordDataV2Serializer.java | 2 +-
.../IgnitePdsCorruptedIndexTest.java | 341 ++++++++++++++
.../IgnitePdsPartitionFilesDestroyTest.java | 444 +++++++++++++++++++
.../IgnitePdsPartitionFilesTruncateTest.java | 153 -------
.../testframework/junits/GridAbstractTest.java | 9 +-
.../junits/multijvm/IgniteProcessProxy.java | 23 +-
.../ignite/testsuites/IgnitePdsTestSuite.java | 2 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 8 +-
.../query/h2/database/InlineIndexHelper.java | 1 -
.../IgnitePdsWithIndexingCoreTestSuite.java | 3 +
14 files changed, 1274 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index be74eff..2f112f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -89,6 +90,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
/** Maximum size for delete queue. */
public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000);
+ /** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */
+ private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false);
+
+ /** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */
+ static volatile Integer partWhereTestCheckpointEnforced;
+
/** Maximum size for {@link #rmvQueue}. */
private final int rmvQueueMaxSize;
@@ -210,6 +217,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
try {
store = grp.offheap().createCacheDataStore(id);
+ // Log partition creation for further crash recovery purposes.
+ if (grp.walEnabled())
+ ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), updateCounter()));
+
// Inject row cache cleaner on store creation
// Used in case the cache with enabled SqlOnheapCache is single cache at the cache group
if (ctx.kernalContext().query().moduleEnabled()) {
@@ -1042,6 +1053,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
ctx.database().checkpointReadUnlock();
}
}
+
+ if (forceTestCheckpointOnEviction) {
+ if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
+ ctx.database().forceCheckpoint("test").finishFuture().get();
+
+ log.warning("Forced checkpoint by test reasons for partition: " + this);
+
+ partWhereTestCheckpointEnforced = id;
+ }
+ }
}
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
@@ -1337,10 +1358,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
*/
class ClearFuture extends GridFutureAdapter<Boolean> {
/** Flag indicates that eviction callback was registered on the current future. */
- private volatile boolean evictionCallbackRegistered;
+ private volatile boolean evictionCbRegistered;
/** Flag indicates that clearing callback was registered on the current future. */
- private volatile boolean clearingCallbackRegistered;
+ private volatile boolean clearingCbRegistered;
/** Flag indicates that future with all callbacks was finished. */
private volatile boolean finished;
@@ -1359,25 +1380,29 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param updateSeq If {@code true} update topology sequence after successful eviction.
*/
private void registerEvictionCallback(boolean updateSeq) {
- if (evictionCallbackRegistered)
+ if (evictionCbRegistered)
return;
synchronized (this) {
// Double check
- if (evictionCallbackRegistered)
+ if (evictionCbRegistered)
return;
- evictionCallbackRegistered = true;
+ evictionCbRegistered = true;
// Initiates partition eviction and destroy.
listen(f -> {
- if (f.error() != null) {
- rent.onDone(f.error());
- } else if (f.isDone()) {
+ try {
+ // Check for errors.
+ f.get();
+
finishEviction(updateSeq);
}
+ catch (Exception e) {
+ rent.onDone(e);
+ }
- evictionCallbackRegistered = false;
+ evictionCbRegistered = false;
});
}
}
@@ -1386,21 +1411,21 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* Registers clearing callback on the future.
*/
private void registerClearingCallback() {
- if (clearingCallbackRegistered)
+ if (clearingCbRegistered)
return;
synchronized (this) {
// Double check
- if (clearingCallbackRegistered)
+ if (clearingCbRegistered)
return;
- clearingCallbackRegistered = true;
+ clearingCbRegistered = true;
// Recreate cache data store in case of allowed fast eviction, and reset clear flag.
listen(f -> {
clear = false;
- clearingCallbackRegistered = false;
+ clearingCbRegistered = false;
});
}
}
@@ -1481,8 +1506,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
reset();
finished = false;
- evictionCallbackRegistered = false;
- clearingCallbackRegistered = false;
+ evictionCbRegistered = false;
+ clearingCbRegistered = false;
}
if (evictionRequested)
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d847132..ca731d2 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -85,7 +84,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
-import org.apache.ignite.internal.mem.DirectMemoryRegion;
import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
@@ -296,6 +294,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
};
+ /** Timeout between partition file destroy and checkpoint to handle it. */
+ private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
+
/** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
private volatile Checkpointer checkpointer;
@@ -389,6 +390,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Initially disabled cache groups. */
private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
+ /** Initially local wal disabled groups. */
private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>();
/**
@@ -498,6 +500,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final GridKernalContext kernalCtx = cctx.kernalContext();
if (!kernalCtx.clientNode()) {
+ checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
+
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
@@ -632,6 +636,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
registrateMetricsMBean();
}
+ if (checkpointer == null)
+ checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
+
super.onActivate(ctx);
}
@@ -1441,8 +1448,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
snapshotMgr.restoreState();
- checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
-
new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
@@ -1920,20 +1925,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
- private @Nullable WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
+ @Nullable private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
return restoreMemory(status, false, (PageMemoryEx)metaStorage.pageMemory());
}
/**
* @param status Checkpoint status.
- * @param storeOnly If {@code True} restores Metastorage only.
+ * @param metastoreOnly If {@code True} restores Metastorage only.
* @param storePageMem Metastore page memory.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
- private @Nullable WALPointer restoreMemory(CheckpointStatus status, boolean storeOnly,
- PageMemoryEx storePageMem) throws IgniteCheckedException {
- assert !storeOnly || storePageMem != null;
+ @Nullable private WALPointer restoreMemory(
+ CheckpointStatus status,
+ boolean metastoreOnly,
+ PageMemoryEx storePageMem
+ ) throws IgniteCheckedException {
+ assert !metastoreOnly || storePageMem != null;
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
@@ -1954,7 +1962,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int applied = 0;
WALPointer lastRead = null;
- Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() :
+ Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() :
F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps);
try (WALIterator it = cctx.wal().replay(status.endPtr)) {
@@ -1990,7 +1998,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// several repetitive restarts and the same pages may have changed several times.
int grpId = pageRec.fullPageId().groupId();
- if (storeOnly && grpId != METASTORAGE_CACHE_ID)
+ if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (!ignoreGrps.contains(grpId)) {
@@ -2020,22 +2028,47 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
+ case PART_META_UPDATE_STATE:
+ PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
+
+ {
+ int grpId = metaStateRecord.groupId();
+
+ if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
+ continue;
+
+ if (ignoreGrps.contains(grpId))
+ continue;
+
+ int partId = metaStateRecord.partitionId();
+
+ GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(metaStateRecord.state());
+
+ if (state == null || state == GridDhtPartitionState.EVICTED)
+ schedulePartitionDestroy(grpId, partId);
+ else
+ cancelOrWaitPartitionDestroy(grpId, partId);
+ }
+
+ break;
+
case PARTITION_DESTROY:
- PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec;
+ PartitionDestroyRecord destroyRecord = (PartitionDestroyRecord)rec;
- final int gId = destroyRec.groupId();
+ {
+ int grpId = destroyRecord.groupId();
- if (storeOnly && gId != METASTORAGE_CACHE_ID)
- continue;
+ if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
+ continue;
+
+ if (ignoreGrps.contains(grpId))
+ continue;
- if (!ignoreGrps.contains(gId)) {
- final int pId = destroyRec.partitionId();
+ PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
- PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId);
+ pageMem.invalidate(grpId, destroyRecord.partitionId());
- pageMem.clearAsync(
- (grpId, pageId) -> grpId == gId && PageIdUtils.partId(pageId) == pId,
- true).get();
+ schedulePartitionDestroy(grpId, destroyRecord.partitionId());
}
break;
@@ -2046,7 +2079,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int grpId = r.groupId();
- if (storeOnly && grpId != METASTORAGE_CACHE_ID)
+ if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (!ignoreGrps.contains(grpId)) {
@@ -2079,7 +2112,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- if (storeOnly)
+ if (metastoreOnly)
return null;
if (status.needRestoreMemory()) {
@@ -2692,6 +2725,170 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Adds given partition to checkpointer destroy queue.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public void schedulePartitionDestroy(int grpId, int partId) {
+ Checkpointer cp = checkpointer;
+
+ if (cp != null)
+ cp.schedulePartitionDestroy(cctx.cache().cacheGroup(grpId), grpId, partId);
+ }
+
+ /**
+ * Cancels or wait for partition destroy.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ Checkpointer cp = checkpointer;
+
+ if (cp != null)
+ cp.cancelOrWaitPartitionDestroy(grpId, partId);
+ }
+
+ /**
+ * Partition destroy queue.
+ */
+ private static class PartitionDestroyQueue {
+ /** */
+ private final ConcurrentMap<T2<Integer, Integer>, PartitionDestroyRequest> pendingReqs =
+ new ConcurrentHashMap<>();
+
+ /**
+ * @param grpCtx Group context.
+ * @param partId Partition ID to destroy.
+ */
+ private void addDestroyRequest(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
+ PartitionDestroyRequest req = new PartitionDestroyRequest(grpId, partId);
+
+ PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grpId, partId), req);
+
+ assert old == null || grpCtx == null : "Must wait for old destroy request to finish before adding a new one "
+ + "[grpId=" + grpId
+ + ", grpName=" + grpCtx.cacheOrGroupName()
+ + ", partId=" + partId + ']';
+ }
+
+ /**
+ * @param destroyId Destroy ID.
+ * @return Destroy request to complete if was not concurrently cancelled.
+ */
+ private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
+ PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
+
+ return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
+ }
+
+ /**
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return Destroy request to wait for if destroy has begun.
+ */
+ private PartitionDestroyRequest cancelDestroy(int grpId, int partId) {
+ PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId));
+
+ return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
+ }
+ }
+
+ /**
+ * Partition destroy request.
+ */
+ private static class PartitionDestroyRequest {
+ /** */
+ private final int grpId;
+
+ /** */
+ private final int partId;
+
+ /** Destroy cancelled flag. */
+ private boolean cancelled;
+
+ /** Destroy future. Not null if partition destroy has begun. */
+ private GridFutureAdapter<Void> destroyFut;
+
+ /**
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ private PartitionDestroyRequest(int grpId, int partId) {
+ this.grpId = grpId;
+ this.partId = partId;
+ }
+
+ /**
+ * Cancels partition destroy request.
+ *
+ * @return {@code False} if this request needs to be waited for.
+ */
+ private synchronized boolean cancel() {
+ if (destroyFut != null) {
+ assert !cancelled;
+
+ return false;
+ }
+
+ cancelled = true;
+
+ return true;
+ }
+
+ /**
+ * Initiates partition destroy.
+ *
+ * @return {@code True} if destroy request should be executed, {@code false} otherwise.
+ */
+ private synchronized boolean beginDestroy() {
+ if (cancelled) {
+ assert destroyFut == null;
+
+ return false;
+ }
+
+ if (destroyFut != null)
+ return false;
+
+ destroyFut = new GridFutureAdapter<>();
+
+ return true;
+ }
+
+ /**
+ *
+ */
+ private synchronized void onDone(Throwable err) {
+ assert destroyFut != null;
+
+ destroyFut.onDone(err);
+ }
+
+ /**
+ *
+ */
+ private void waitCompleted() throws IgniteCheckedException {
+ GridFutureAdapter<Void> fut;
+
+ synchronized (this) {
+ assert destroyFut != null;
+
+ fut = destroyFut;
+ }
+
+ fut.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "PartitionDestroyRequest [grpId=" + grpId + ", partId=" + partId + ']';
+ }
+ }
+
+ /**
* Checkpointer object is used for notification on checkpoint begin, predicate is {@link #scheduledCp}<code>.nextCpTs - now
* > 0 </code>. Method {@link #wakeupForCheckpoint} uses notify, {@link #waitCheckpointEvent} uses wait
*/
@@ -2704,7 +2901,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private volatile CheckpointProgress scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
- private volatile CheckpointProgress curCpProgress;
+ @Nullable private volatile CheckpointProgress curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
@@ -2850,7 +3047,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
syncedPagesCntr = new AtomicInteger();
evictedPagesCntr = new AtomicInteger();
- boolean interrupted = true;
+ boolean success = false;
try {
if (chp.hasDelta()) {
@@ -2937,11 +3134,22 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
snapshotMgr.afterCheckpointPageWritten();
+ try {
+ destroyEvictedPartitions();
+ }
+ catch (IgniteCheckedException e) {
+ chp.progress.cpFinishFut.onDone(e);
+
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ return;
+ }
+
// Must mark successful checkpoint only if there are no exceptions or interrupts.
- interrupted = false;
+ success = true;
}
finally {
- if (!interrupted)
+ if (success)
markCheckpointEnd(chp);
}
@@ -2992,6 +3200,122 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Processes all evicted partitions scheduled for destroy.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void destroyEvictedPartitions() throws IgniteCheckedException {
+ PartitionDestroyQueue destroyQueue = curCpProgress.destroyQueue;
+
+ if (destroyQueue.pendingReqs.isEmpty())
+ return;
+
+ List<PartitionDestroyRequest> reqs = null;
+
+ for (final PartitionDestroyRequest req : destroyQueue.pendingReqs.values()) {
+ if (!req.beginDestroy())
+ continue;
+
+ final int grpId = req.grpId;
+ final int partId = req.partId;
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+ assert grp != null
+ : "Cache group is not initialized [grpId=" + grpId + "]";
+ assert grp.offheap() instanceof GridCacheOffheapManager
+ : "Destroying partition files when persistence is off " + grp.offheap();
+
+ final GridCacheOffheapManager offheap = (GridCacheOffheapManager) grp.offheap();
+
+ Runnable destroyPartTask = () -> {
+ try {
+ offheap.destroyPartitionStore(grpId, partId);
+
+ req.onDone(null);
+
+ if (log.isDebugEnabled())
+ log.debug("Partition file has destroyed [grpId=" + grpId + ", partId=" + partId + "]");
+ }
+ catch (Exception e) {
+ req.onDone(new IgniteCheckedException(
+ "Partition file destroy has failed [grpId=" + grpId + ", partId=" + partId + "]", e));
+ }
+ };
+
+ if (asyncRunner != null) {
+ try {
+ asyncRunner.execute(destroyPartTask);
+ }
+ catch (RejectedExecutionException ignore) {
+ // Run the task synchronously.
+ destroyPartTask.run();
+ }
+ }
+ else
+ destroyPartTask.run();
+
+ if (reqs == null)
+ reqs = new ArrayList<>();
+
+ reqs.add(req);
+ }
+
+ if (reqs != null)
+ for (PartitionDestroyRequest req : reqs)
+ req.waitCompleted();
+
+ destroyQueue.pendingReqs.clear();
+ }
+
+ /**
+ * @param grpCtx Group context. Can be {@code null} in case of crash recovery.
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ private void schedulePartitionDestroy(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
+ synchronized (this) {
+ scheduledCp.destroyQueue.addDestroyRequest(grpCtx, grpId, partId);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Partition file has been scheduled to destroy [grpId=" + grpId + ", partId=" + partId + "]");
+
+ if (grpCtx != null)
+ wakeupForCheckpoint(PARTITION_DESTROY_CHECKPOINT_TIMEOUT, "partition destroy");
+ }
+
+ /**
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ PartitionDestroyRequest req;
+
+ synchronized (this) {
+ req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId);
+ }
+
+ if (req != null)
+ req.waitCompleted();
+
+ CheckpointProgress cur;
+
+ synchronized (this) {
+ cur = curCpProgress;
+
+ if (cur != null)
+ req = cur.destroyQueue.cancelDestroy(grpId, partId);
+ }
+
+ if (req != null)
+ req.waitCompleted();
+
+ if (log.isDebugEnabled())
+ log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
+ }
+
+ /**
*
*/
@SuppressWarnings("WaitNotInLoop")
@@ -3136,7 +3460,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- if (hasPages) {
+ if (hasPages || !curr.destroyQueue.pendingReqs.isEmpty()) {
assert cpPtr != null;
tracker.onWalCpRecordFsyncStart();
@@ -3611,6 +3935,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private volatile SnapshotOperation snapshotOperation;
+ /** Partitions destroy queue. */
+ private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue();
+
/** Wakeup reason. */
private String reason;
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5feaa25..50c90e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -148,8 +148,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override protected CacheDataStore createCacheDataStore0(final int p)
- throws IgniteCheckedException {
+ @Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException {
+ if (ctx.database() instanceof GridCacheDatabaseSharedManager)
+ ((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
+
boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
return new GridCacheDataStore(p, exists);
@@ -575,25 +577,41 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException {
+ assert ctx.database() instanceof GridCacheDatabaseSharedManager
+ : "Destroying cache data store when persistence is not enabled: " + ctx.database();
+
+ int partId = store.partId();
+
ctx.database().checkpointReadLock();
try {
- int p = store.partId();
-
saveStoreMetadata(store, null, false, true);
-
- PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
-
- int tag = pageMemory.invalidate(grp.groupId(), p);
-
- if (grp.walEnabled())
- ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p));
-
- ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
}
finally {
ctx.database().checkpointReadUnlock();
}
+
+ ((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId);
+ }
+
+ /**
+ * Invalidates page memory for given partition. Destroys partition store.
+ * <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ *
+ * @throws IgniteCheckedException If destroy has failed.
+ */
+ public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException {
+ PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+ int tag = pageMemory.invalidate(grp.groupId(), partId);
+
+ if (grp.walEnabled())
+ ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId));
+
+ ctx.pageStore().onPartitionDestroyed(grpId, partId, tag);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 05f9421..ae4880d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -244,7 +245,7 @@ public class FilePageStore implements PageStore {
fileIO.close();
if (cleanFile)
- cfgFile.delete();
+ Files.delete(cfgFile.toPath());
}
catch (IOException e) {
throw new PersistentStorageIOException(e);
@@ -255,35 +256,34 @@ public class FilePageStore implements PageStore {
}
/**
+ * Truncates and deletes partition file.
*
+ * @param tag New partition tag.
+ * @throws PersistentStorageIOException If failed
*/
public void truncate(int tag) throws PersistentStorageIOException {
- lock.writeLock().lock();
+ init();
- long pages = this.pages();
+ lock.writeLock().lock();
try {
- if (!inited)
- return;
-
this.tag = tag;
- try {
- fileIO.close();
- }
- finally {
- cfgFile.delete();
- }
+ fileIO.clear();
+
+ fileIO.close();
+
+ Files.delete(cfgFile.toPath());
}
catch (IOException e) {
- throw new PersistentStorageIOException(e);
+ throw new PersistentStorageIOException("Failed to delete partition file: " + cfgFile.getPath(), e);
}
finally {
- inited = false;
-
allocated.set(0);
- allocatedTracker.updateTotalAllocatedPages(-1L * pages);
+ allocatedTracker.updateTotalAllocatedPages(-1L * this.pages());
+
+ inited = false;
lock.writeLock().unlock();
}
@@ -325,7 +325,7 @@ public class FilePageStore implements PageStore {
recover = false;
}
catch (IOException e) {
- throw new PersistentStorageIOException("Unable to finish recover", e);
+ throw new PersistentStorageIOException("Failed to finish recover", e);
}
finally {
lock.writeLock().unlock();
@@ -421,9 +421,9 @@ public class FilePageStore implements PageStore {
}
/**
- * @throws IgniteCheckedException If failed to initialize store file.
+ * @throws PersistentStorageIOException If failed to initialize store file.
*/
- private void init() throws IgniteCheckedException {
+ private void init() throws PersistentStorageIOException {
if (!inited) {
lock.writeLock().lock();
@@ -431,7 +431,7 @@ public class FilePageStore implements PageStore {
if (!inited) {
FileIO fileIO = null;
- IgniteCheckedException err = null;
+ PersistentStorageIOException err = null;
try {
this.fileIO = fileIO = ioFactory.create(cfgFile, CREATE, READ, WRITE);
@@ -451,7 +451,8 @@ public class FilePageStore implements PageStore {
inited = true;
}
catch (IOException e) {
- err = new PersistentStorageIOException("Could not initialize file: " + cfgFile.getName(), e);
+ err = new PersistentStorageIOException(
+ "Failed to initialize partition file: " + cfgFile.getName(), e);
throw err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index efba611..b3a00be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -92,7 +92,7 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
return 8 + 1;
case EXCHANGE:
- return 4 /*type*/ + 8 /*timestamp*/ + 2 /*constId*/;
+ return 4 /*type*/ + 8 /*timestamp*/ + 2 /*constId*/;
case TX_RECORD:
return txRecordSerializer.size((TxRecord)rec);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
new file mode 100644
index 0000000..a1065f6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.OpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.junit.Assert;
+
+/**
+ * Test to reproduce corrupted indexes problem after partition file eviction and truncation.
+ */
+public class IgnitePdsCorruptedIndexTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE = "cache";
+
+ /** Flag indicates that {@link HaltOnTruncateFileIO} should be used. */
+ private boolean haltFileIO;
+
+ /** MultiJVM flag. */
+ private boolean multiJvm = true;
+
+ /** Additional remote JVM args. */
+ private List<String> additionalArgs = Collections.emptyList();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointFrequency(10 * 60 * 1000)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(512 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ if (haltFileIO)
+ dsCfg.setFileIOFactory(new HaltOnTruncateFileIOFactory(new RandomAccessFileIOFactory()));
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE)
+ .setBackups(1)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setIndexedTypes(Integer.class, IndexedObject.class, Long.class, IndexedObject.class)
+ .setAffinity(new RendezvousAffinityFunction(false, 32));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isMultiJvm() {
+ return multiJvm;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<String> additionalRemoteJvmArgs() {
+ return additionalArgs;
+ }
+
+ /**
+ *
+ */
+ public void testCorruption() throws Exception {
+ final String corruptedNodeName = "corrupted";
+
+ IgniteEx ignite = startGrid(0);
+
+ haltFileIO = true;
+
+ additionalArgs = new ArrayList<>();
+ additionalArgs.add("-D" + "TEST_CHECKPOINT_ON_EVICTION=true");
+ additionalArgs.add("-D" + "IGNITE_QUIET=false");
+
+ IgniteEx corruptedNode = (IgniteEx) startGrid(corruptedNodeName);
+
+ additionalArgs.clear();
+
+ haltFileIO = false;
+
+ startGrid(2);
+
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ final int entityCnt = 3_200;
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(CACHE)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < entityCnt; i++)
+ streamer.addData(i, new IndexedObject(i));
+ }
+
+ startGrid(3);
+
+ resetBaselineTopology();
+
+ // Corrupted node should be halted during partition destroy.
+ GridTestUtils.waitForCondition(() -> ignite.cluster().nodes().size() == 3, getTestTimeout());
+
+ // Clear remote JVM instance cache.
+ IgniteProcessProxy.kill(corruptedNode.name());
+
+ stopAllGrids();
+
+ // Disable multi-JVM mode and start coordinator and corrupted node in the same JVM.
+ multiJvm = false;
+
+ startGrid(0);
+
+ corruptedNode = (IgniteEx) startGrid(corruptedNodeName);
+
+ corruptedNode.cluster().active(true);
+
+ resetBaselineTopology();
+
+ // If index was corrupted, rebalance or one of the following queries should be failed.
+ awaitPartitionMapExchange();
+
+ for (int k = 0; k < entityCnt; k += entityCnt / 4) {
+ IgniteCache<Integer, IndexedObject> cache1 = corruptedNode.cache(CACHE);
+
+ int l = k;
+ int r = k + entityCnt / 4 - 1;
+
+ log.info("Check range [" + l + "-" + r + "]");
+
+ QueryCursor<Cache.Entry<Long, IndexedObject>> qry =
+ cache1.query(new SqlQuery(IndexedObject.class, "lVal between ? and ?")
+ .setArgs(l * l, r * r));
+
+ Collection<Cache.Entry<Long, IndexedObject>> queried = qry.getAll();
+
+ log.info("Qry result size = " + queried.size());
+ }
+ }
+
+ /**
+ *
+ */
+ private static class IndexedObject {
+ /** Integer indexed value. */
+ @QuerySqlField(index = true)
+ private int iVal;
+
+ /** Long indexed value. */
+ @QuerySqlField(index = true)
+ private long lVal;
+
+ /** */
+ private byte[] payload = new byte[1024];
+
+ /**
+ * @param iVal Integer value.
+ */
+ private IndexedObject(int iVal) {
+ this.iVal = iVal;
+ this.lVal = (long) iVal * iVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof IndexedObject))
+ return false;
+
+ IndexedObject that = (IndexedObject)o;
+
+ return iVal == that.iVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return iVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexedObject.class, this);
+ }
+ }
+
+ /**
+ * File I/O which halts JVM after specified file truncation.
+ */
+ private static class HaltOnTruncateFileIO extends FileIODecorator {
+ /** File. */
+ private final File file;
+
+ /** The overall number of file truncations have done. */
+ private static final AtomicInteger truncations = new AtomicInteger();
+
+ /**
+ * @param delegate File I/O delegate
+ */
+ public HaltOnTruncateFileIO(FileIO delegate, File file) {
+ super(delegate);
+ this.file = file;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() throws IOException {
+ super.clear();
+
+ System.err.println("Truncated file: " + file.getAbsolutePath());
+
+ truncations.incrementAndGet();
+
+ Integer checkpointedPart = null;
+ try {
+ Field field = GridDhtLocalPartition.class.getDeclaredField("partWhereTestCheckpointEnforced");
+
+ field.setAccessible(true);
+
+ checkpointedPart = (Integer) field.get(null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // Wait while more than one file have truncated and checkpoint on partition eviction has done.
+ if (truncations.get() > 1 && checkpointedPart != null) {
+ System.err.println("JVM is going to be crushed for test reasons...");
+
+ Runtime.getRuntime().halt(0);
+ }
+ }
+ }
+
+ /**
+ * I/O Factory which creates {@link HaltOnTruncateFileIO} instances for partition files.
+ */
+ private static class HaltOnTruncateFileIOFactory implements FileIOFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private final FileIOFactory delegateFactory;
+
+ /**
+ * @param delegateFactory Delegate factory.
+ */
+ HaltOnTruncateFileIOFactory(FileIOFactory delegateFactory) {
+ this.delegateFactory = delegateFactory;
+ }
+
+ /**
+ * @param file File.
+ */
+ private static boolean isPartitionFile(File file) {
+ return file.getName().contains("part") && file.getName().endsWith("bin");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ FileIO delegate = delegateFactory.create(file);
+
+ if (isPartitionFile(file))
+ return new HaltOnTruncateFileIO(delegate, file);
+
+ return delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = delegateFactory.create(file, modes);
+
+ if (isPartitionFile(file))
+ return new HaltOnTruncateFileIO(delegate, file);
+
+ return delegate;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java
new file mode 100644
index 0000000..b5afddf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Test class to check that partition files after eviction are destroyed correctly on next checkpoint or crash recovery.
+ */
+public class IgnitePdsPartitionFilesDestroyTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE = "cache";
+
+ /** Partitions count. */
+ private static final int PARTS_CNT = 32;
+
+ /** Set if I/O exception should be thrown on partition file truncation. */
+ private boolean failFileIo;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointFrequency(10 * 60 * 1000)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(512 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ if (failFileIo)
+ dsCfg.setFileIOFactory(new FailingFileIOFactory(new RandomAccessFileIOFactory()));
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE)
+ .setBackups(1)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ failFileIo = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param keysCnt Keys count.
+ */
+ private void loadData(IgniteEx ignite, int keysCnt, int multiplier) {
+ log.info("Load data: keys=" + keysCnt);
+
+ try (IgniteDataStreamer streamer = ignite.dataStreamer(CACHE)) {
+ streamer.allowOverwrite(true);
+
+ for (int k = 0; k < keysCnt; k++)
+ streamer.addData(k, k * multiplier);
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param keysCnt Keys count.
+ */
+ private void checkData(IgniteEx ignite, int keysCnt, int multiplier) {
+ log.info("Check data: " + ignite.name() + ", keys=" + keysCnt);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+ for (int k = 0; k < keysCnt; k++)
+ Assert.assertEquals("node = " + ignite.name() + ", key = " + k, (Integer) (k * multiplier), cache.get(k));
+ }
+
+ /**
+ * Test that partition files have been deleted correctly on next checkpoint.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionFileDestroyAfterCheckpoint() throws Exception {
+ IgniteEx crd = (IgniteEx) startGrids(2);
+
+ crd.cluster().active(true);
+
+ int keysCnt = 50_000;
+
+ loadData(crd, keysCnt, 1);
+
+ startGridsMultiThreaded(2, 2);
+
+ // Trigger partitions eviction.
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(crd, true);
+
+ // This checkpoint should delete partition files.
+ forceCheckpoint();
+
+ checkPartitionFiles(crd, false);
+
+ for (Ignite ignite : G.allGrids())
+ checkData((IgniteEx) ignite, keysCnt, 1);
+ }
+
+ /**
+ * Test that partition files are reused correctly.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionFileDestroyAndRecreate() throws Exception {
+ IgniteEx crd = startGrid(0);
+
+ IgniteEx node = startGrid(1);
+
+ crd.cluster().active(true);
+
+ int keysCnt = 50_000;
+
+ loadData(crd, keysCnt, 1);
+
+ startGridsMultiThreaded(2, 2);
+
+ // Trigger partitions eviction.
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(node, true);
+
+ // Trigger partitions re-create.
+ stopGrid(2);
+
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(node, true);
+
+ // Rewrite data.
+ loadData(crd, keysCnt, 2);
+
+ // Force checkpoint on all nodes.
+ forceCheckpoint();
+
+ // Check that all unecessary partition files have been deleted.
+ checkPartitionFiles(node, false);
+
+ for (Ignite ignite : G.allGrids())
+ checkData((IgniteEx) ignite, keysCnt, 2);
+ }
+
+ /**
+ * Test that partitions files have been deleted correctly during crash recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionFileDestroyCrashRecovery1() throws Exception {
+ IgniteEx crd = startGrid(0);
+
+ failFileIo = true;
+
+ IgniteEx problemNode = startGrid(1);
+
+ failFileIo = false;
+
+ crd.cluster().active(true);
+
+ int keysCnt = 50_000;
+
+ loadData(crd, keysCnt, 1);
+
+ startGridsMultiThreaded(2, 2);
+
+ // Trigger partitions eviction.
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(problemNode, true);
+
+ try {
+ forceCheckpoint(problemNode);
+
+ Assert.assertTrue("Checkpoint must be failed", false);
+ }
+ catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ // Problem node should be stopped after failed checkpoint.
+ waitForTopology(3);
+
+ problemNode = startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ // After recovery all evicted partition files should be deleted from disk.
+ checkPartitionFiles(problemNode, false);
+
+ for (Ignite ignite : G.allGrids())
+ checkData((IgniteEx) ignite, keysCnt, 1);
+ }
+
+ /**
+ * Test that partitions files are not deleted if they were re-created on next time
+ * and no checkpoint has done during this time.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionFileDestroyCrashRecovery2() throws Exception {
+ IgniteEx crd = startGrid(0);
+
+ failFileIo = true;
+
+ IgniteEx problemNode = startGrid(1);
+
+ failFileIo = false;
+
+ crd.cluster().active(true);
+
+ int keysCnt = 50_000;
+
+ loadData(crd, keysCnt, 1);
+
+ // Trigger partitions eviction.
+ startGridsMultiThreaded(2, 2);
+
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(problemNode, true);
+
+ // Trigger partitions re-create.
+ stopGrid(2);
+
+ resetBaselineTopology();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ checkPartitionFiles(problemNode, true);
+
+ try {
+ forceCheckpoint(problemNode);
+
+ Assert.assertTrue("Checkpoint must be failed", false);
+ }
+ catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ // Problem node should be stopped after failed checkpoint.
+ waitForTopology(2);
+
+ problemNode = startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ // After recovery all evicted partition files should be deleted from disk.
+ checkPartitionFiles(problemNode, false);
+
+ for (Ignite ignite : G.allGrids())
+ checkData((IgniteEx) ignite, keysCnt, 1);
+ }
+
+ /**
+ * If {@code exists} is {@code true}, checks that all partition files exist
+ * if partition has state EVICTED.
+ *
+ * If {@code exists} is {@code false}, checks that all partition files don't exist
+ * if partition is absent or has state EVICTED.
+ *
+ * @param ignite Node.
+ * @param exists If {@code true} method will check that partition file exists,
+ * in other case will check that file doesn't exist.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void checkPartitionFiles(IgniteEx ignite, boolean exists) throws IgniteCheckedException {
+ int evicted = 0;
+
+ GridDhtPartitionTopology top = ignite.cachex(CACHE).context().topology();
+
+ for (int p = 0; p < PARTS_CNT; p++) {
+ GridDhtLocalPartition part = top.localPartition(p);
+
+ File partFile = partitionFile(ignite, CACHE, p);
+
+ if (exists) {
+ if (part != null && part.state() == GridDhtPartitionState.EVICTED)
+ Assert.assertTrue("Partition file has deleted ahead of time: " + partFile, partFile.exists());
+
+ evicted++;
+ }
+ else {
+ if (part == null || part.state() == GridDhtPartitionState.EVICTED)
+ Assert.assertTrue("Partition file has not deleted: " + partFile, !partFile.exists());
+ }
+ }
+
+ if (exists)
+ Assert.assertTrue("There should be at least 1 eviction", evicted > 0);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ * @param partId Partition id.
+ */
+ private static File partitionFile(Ignite ignite, String cacheName, int partId) throws IgniteCheckedException {
+ File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+ String nodeName = ignite.name().replaceAll("\\.", "_");
+
+ return new File(dbDir, String.format("%s/cache-%s/part-%d.bin", nodeName, cacheName, partId));
+ }
+
+ /**
+ *
+ */
+ static class FailingFileIO extends FileIODecorator {
+ /**
+ * @param delegate File I/O delegate
+ */
+ public FailingFileIO(FileIO delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() throws IOException {
+ throw new IOException("Test");
+ }
+ }
+
+ /**
+ *
+ */
+ static class FailingFileIOFactory implements FileIOFactory {
+ /** Delegate factory. */
+ private final FileIOFactory delegateFactory;
+
+ /**
+ * @param delegateFactory Delegate factory.
+ */
+ FailingFileIOFactory(FileIOFactory delegateFactory) {
+ this.delegateFactory = delegateFactory;
+ }
+
+ /**
+ * @param file File.
+ */
+ private static boolean isPartitionFile(File file) {
+ return file.getName().contains("part") && file.getName().endsWith("bin");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ FileIO delegate = delegateFactory.create(file);
+
+ if (isPartitionFile(file))
+ return new FailingFileIO(delegate);
+
+ return delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = delegateFactory.create(file, modes);
+
+ if (isPartitionFile(file))
+ return new FailingFileIO(delegate);
+
+ return delegate;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
deleted file mode 100644
index 78c2453..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Checks that evicted partitions doesn't leave files in PDS.
- */
-public class IgnitePdsPartitionFilesTruncateTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setConsistentId(gridName)
- .setDataStorageConfiguration(new DataStorageConfiguration()
- .setPageSize(1024)
- .setWalMode(WALMode.LOG_ONLY)
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setPersistenceEnabled(true)))
- .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
- .setBackups(1)
- .setAffinity(new RendezvousAffinityFunction(false, 32)));
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTruncatingPartitionFilesOnEviction() throws Exception {
- Ignite ignite0 = startGrids(3);
-
- ignite0.cluster().active(true);
-
- try (IgniteDataStreamer<Integer,String> streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
- for (int i = 0; i < 1_000; i++)
- streamer.addData(i, "Value " + i);
- }
-
- assertEquals(1, ignite0.cacheNames().size());
-
- awaitPartitionMapExchange(true, true, null);
-
- checkPartFiles(0);
- checkPartFiles(1);
- checkPartFiles(2);
-
- stopGrid(2);
-
- ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
-
- awaitPartitionMapExchange(true, true, null);
-
- checkPartFiles(0);
- checkPartFiles(1);
-
- startGrid(2);
-
- ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
-
- awaitPartitionMapExchange(true, true, null);
-
- checkPartFiles(0);
- checkPartFiles(1);
- checkPartFiles(2);
- }
-
- /**
- * @param idx Node index.
- */
- private void checkPartFiles(int idx) throws Exception {
- Ignite ignite = grid(idx);
-
- int[] parts = ignite.affinity(DEFAULT_CACHE_NAME).allPartitions(ignite.cluster().localNode());
-
- Path dirPath = Paths.get(U.defaultWorkDirectory(), "db",
- U.maskForFileName(ignite.configuration().getIgniteInstanceName()), "cache-" + DEFAULT_CACHE_NAME);
-
- info("Path: " + dirPath.toString());
-
- assertTrue(Files.exists(dirPath));
-
- for (Path f : Files.newDirectoryStream(dirPath)) {
- if (f.getFileName().toString().startsWith("part-"))
- assertTrue("Node_" + idx +" should contains only partitions " + Arrays.toString(parts)
- + ", but the file is redundant: " + f.getFileName(), anyMatch(parts, f));
- }
- }
-
- /** */
- private boolean anyMatch(int[] parts, Path f) {
- Pattern ptrn = Pattern.compile("part-(\\d+).bin");
- Matcher matcher = ptrn.matcher(f.getFileName().toString());
-
- if (!matcher.find())
- throw new IllegalArgumentException("File is not a partition:" + f.getFileName());
-
- int part = Integer.parseInt(matcher.group(1));
-
- for (int p: parts) {
- if (p == part)
- return true;
- }
-
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f5784eb..0092926 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1015,7 +1015,14 @@ public abstract class GridAbstractTest extends TestCase {
}
}
- return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery);
+ return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery, additionalRemoteJvmArgs());
+ }
+
+ /**
+ * @return Additional JVM args for remote instances.
+ */
+ protected List<String> additionalRemoteJvmArgs() {
+ return Collections.emptyList();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index fb59ae2..1eb7ddb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -19,6 +19,8 @@ package org.apache.ignite.testframework.junits.multijvm;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -139,10 +141,28 @@ public class IgniteProcessProxy implements IgniteEx {
* @param cfg Configuration.
* @param log Logger.
* @param locJvmGrid Local JVM grid.
+ * @throws Exception On error.
+ */
+ public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid, boolean discovery)
+ throws Exception {
+ this(cfg, log, locJvmGrid, discovery, Collections.emptyList());
+ }
+
+
+ /**
+ * @param cfg Configuration.
+ * @param log Logger.
+ * @param locJvmGrid Local JVM grid.
* @param resetDiscovery Reset DiscoverySpi at the configuration.
* @throws Exception On error.
*/
- public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid, boolean resetDiscovery)
+ public IgniteProcessProxy(
+ IgniteConfiguration cfg,
+ IgniteLogger log,
+ Ignite locJvmGrid,
+ boolean resetDiscovery,
+ List<String> additionalArgs
+ )
throws Exception {
this.cfg = cfg;
this.locJvmGrid = locJvmGrid;
@@ -151,6 +171,7 @@ public class IgniteProcessProxy implements IgniteEx {
String params = params(cfg, resetDiscovery);
Collection<String> filteredJvmArgs = filteredJvmArgs();
+ filteredJvmArgs.addAll(additionalArgs);
final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index d11ceb3..e4c59b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -91,6 +91,8 @@ public class IgnitePdsTestSuite extends TestSuite {
/**
* Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations.
*
+ * NOTE: These tests are also executed using I/O plugins.
+ *
* @param suite suite to add tests into.
*/
public static void addRealPageStoreTests(TestSuite suite) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 43de84e..1185ebd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuo
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesTruncateTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesDestroyTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest;
@@ -91,18 +91,20 @@ public class IgnitePdsTestSuite2 extends TestSuite {
// Integrity test.
suite.addTestSuite(IgnitePdsRecoveryAfterFileCorruptionTest.class);
+
+ suite.addTestSuite(IgnitePdsPartitionFilesDestroyTest.class);
}
/**
* Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations.
*
+ * NOTE: These tests are also executed using I/O plugins.
+ *
* @param suite suite to add tests into.
*/
public static void addRealPageStoreTests(TestSuite suite) {
suite.addTestSuite(IgnitePdsPageSizesTest.class);
- suite.addTestSuite(IgnitePdsPartitionFilesTruncateTest.class);
-
// Metrics test.
suite.addTestSuite(IgniteDataStorageMetricsSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java
index 5c42d0d..3419127 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java
@@ -61,7 +61,6 @@ public class InlineIndexHelper {
Value.SHORT,
Value.INT,
Value.LONG,
- Value.LONG,
Value.FLOAT,
Value.DOUBLE,
Value.DATE,
http://git-wip-us.apache.org/repos/asf/ignite/blob/247ab2e3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 943d43f..d33b20b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCa
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheHistoricalRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
@@ -76,6 +77,8 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
suite.addTestSuite(IgnitePdsThreadInterruptionTest.class);
suite.addTestSuite(IgnitePdsBinarySortObjectFieldsTest.class);
+ suite.addTestSuite(IgnitePdsCorruptedIndexTest.class);
+
return suite;
}
}