You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/05 11:08:14 UTC
[ignite] 11/31: GG-18670 [IGNITE-11749] Implement automatic pages
history dump on CorruptedTreeException. Contributors: Anton Kalashnikov
Ivan Bessonov
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-19225
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 4af7b7dcd779ddc9f8308465a234d46bf188bda4
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Fri May 31 00:04:37 2019 +0300
GG-18670 [IGNITE-11749] Implement automatic pages history dump on CorruptedTreeException.
Contributors:
Anton Kalashnikov <ka...@yandex.ru>
Ivan Bessonov <be...@gmail.com>
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../apache/ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../org/apache/ignite/internal/IgniteKernal.java | 3 +
.../cache/persistence/CacheDataRowAdapter.java | 141 ++++++--
.../cache/persistence/tree/BPlusTree.java | 143 ++++++--
.../tree/BPlusTreeRuntimeException.java | 52 +++
.../persistence/tree/CorruptedTreeException.java | 81 ++++-
.../persistence/wal/FileWriteAheadLogManager.java | 7 +
.../cache/persistence/wal/SegmentRouter.java | 21 ++
.../wal/reader/FilteredWalIterator.java | 109 ++++++
.../wal/reader/IgniteWalIteratorFactory.java | 47 ++-
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../cache/persistence/wal/reader/WalFilters.java | 89 +++++
.../wal/scanner/PrintRawToFileHandler.java | 74 ++++
.../wal/scanner/PrintToFileHandler.java | 133 +++++++
.../persistence/wal/scanner/PrintToLogHandler.java | 71 ++++
.../persistence/wal/scanner/ScannerHandler.java | 69 ++++
.../persistence/wal/scanner/ScannerHandlers.java | 64 ++++
.../cache/persistence/wal/scanner/WalScanner.java | 164 +++++++++
.../processors/diagnostic/DiagnosticProcessor.java | 196 +++++++++++
.../diagnostic/PageHistoryDiagnoster.java | 381 +++++++++++++++++++++
.../processors/failure/FailureProcessor.java | 6 +
.../ignite/internal/util/IgniteStopwatch.java | 33 ++
.../util/lang/IgniteThrowableConsumer.java | 10 +-
...eConsumer.java => IgniteThrowableSupplier.java} | 26 +-
.../org/apache/ignite/lang/IgniteBiPredicate.java | 25 +-
.../CorruptedTreeFailureHandlingTest.java | 255 ++++++++++++++
.../wal/reader/FilteredWalIteratorTest.java | 214 ++++++++++++
.../persistence/wal/scanner/WalScannerTest.java | 337 ++++++++++++++++++
.../diagnostic/DiagnosticProcessorTest.java | 269 +++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 4 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 6 +
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
.../visor/verify/ValidateIndexesClosure.java | 16 +-
34 files changed, 2982 insertions(+), 92 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index c1f5ea4..d95e44f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
@@ -453,6 +454,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public LongJVMPauseDetector longJvmPauseDetector();
/**
+ * Gets diagnostic processor.
+ *
+ * @return Diagnostic processor.
+ */
+ public DiagnosticProcessor diagnostic();
+
+ /**
* Checks whether this node is invalid due to a critical error or not.
*
* @return {@code True} if this node is invalid, {@code false} otherwise.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 360bef8..8fa16f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
@@ -303,6 +304,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringExclude
private IgniteAuthenticationProcessor authProc;
+ /** Diagnostic processor. */
+ @GridToStringInclude
+ private DiagnosticProcessor diagnosticProcessor;
+
/** */
@GridToStringExclude
private List<GridComponent> comps = new LinkedList<>();
@@ -645,6 +650,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
internalSubscriptionProc = (GridInternalSubscriptionProcessor)comp;
else if (comp instanceof IgniteAuthenticationProcessor)
authProc = (IgniteAuthenticationProcessor)comp;
+ else if (comp instanceof DiagnosticProcessor)
+ diagnosticProcessor = (DiagnosticProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -944,6 +951,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public DiagnosticProcessor diagnostic() {
+ return diagnosticProcessor;
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Grid memory stats [igniteInstanceName=" + igniteInstanceName() + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e68b4bd..c3dbf07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -136,6 +136,7 @@ import org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
@@ -911,6 +912,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
longJVMPauseDetector
);
+ startProcessor(new DiagnosticProcessor(ctx));
+
mBeansMgr = new IgniteMBeansManager(this);
cfg.getMarshaller().setContext(ctx.marshallerContext());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 8670a1d..b3e946d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -30,10 +30,12 @@ import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.IncompleteObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTreeRuntimeException;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -142,62 +144,79 @@ public class CacheDataRowAdapter implements CacheDataRow {
int grpId = grp != null ? grp.groupId() : 0;
- final long page = pageMem.acquirePage(grpId, pageId);
-
try {
- long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled.
-
- assert pageAddr != 0L : nextLink;
+ final long page = pageMem.acquirePage(grpId, pageId);
try {
- DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+ long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled.
- DataPagePayload data = io.readPayload(pageAddr,
- itemId(nextLink),
- pageMem.realPageSize(grpId));
+ assert pageAddr != 0L : nextLink;
- nextLink = data.nextLink();
+ try {
+ DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
- int hdrLen = 0;
+ DataPagePayload data = io.readPayload(pageAddr,
+ itemId(nextLink),
+ pageMem.realPageSize(grpId));
- if (first) {
- if (nextLink == 0) {
- // Fast path for a single page row.
- readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
+ nextLink = data.nextLink();
- return;
- }
+ int hdrLen = 0;
- first = false;
+ if (first) {
+ if (nextLink == 0) {
+ // Fast path for a single page row.
+ readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
- // Assume that row header is always located entirely on the very first page.
- hdrLen = readHeader(sharedCtx, pageAddr, data.offset(), rowData);
+ return;
+ }
- if (rowData == LINK_WITH_HEADER)
- return;
- }
+ first = false;
+
+ // Assume that row header is always located entirely on the very first page.
+ hdrLen = readHeader(sharedCtx, pageAddr, data.offset(), rowData);
+
+ if (rowData == LINK_WITH_HEADER)
+ return;
+ }
- ByteBuffer buf = pageMem.pageBuffer(pageAddr);
+ ByteBuffer buf = pageMem.pageBuffer(pageAddr);
- int off = data.offset() + hdrLen;
- int payloadSize = data.payloadSize() - hdrLen;
+ int off = data.offset() + hdrLen;
+ int payloadSize = data.payloadSize() - hdrLen;
- buf.position(off);
- buf.limit(off + payloadSize);
+ buf.position(off);
+ buf.limit(off + payloadSize);
- boolean keyOnly = rowData == RowData.KEY_ONLY;
+ boolean keyOnly = rowData == RowData.KEY_ONLY;
- incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
+ incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
- if (keyOnly && key != null)
- return;
+ if (keyOnly && key != null)
+ return;
+ }
+ finally {
+ pageMem.readUnlock(grpId, pageId, page);
+ }
}
finally {
- pageMem.readUnlock(grpId, pageId, page);
+ pageMem.releasePage(grpId, pageId, page);
}
}
- finally {
- pageMem.releasePage(grpId, pageId, page);
+ catch (RuntimeException | AssertionError e) {
+ // Collect all pages from first link to pageId.
+ long[] pageIds;
+
+ try {
+ pageIds = relatedPageIds(grpId, link, pageId, pageMem);
+
+ }
+ catch (IgniteCheckedException e0) {
+ // Ignore exception if failed to resolve related page ids.
+ pageIds = new long[] {pageId};
+ }
+
+ throw new BPlusTreeRuntimeException(e, grpId, pageIds);
}
}
while(nextLink != 0);
@@ -542,6 +561,58 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ *
+ * @param grpId Group id.
+ * @param link Link.
+ * @param pageId PageId.
+ * @param pageMem Page memory.
+ * @return Array of page ids from link to pageId.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long[] relatedPageIds(
+ int grpId,
+ long link,
+ long pageId,
+ PageMemory pageMem
+ ) throws IgniteCheckedException {
+ GridLongList pageIds = new GridLongList();
+
+ long nextLink = link;
+ long nextLinkPageId = pageId(nextLink);
+
+ while (nextLinkPageId != pageId) {
+ pageIds.add(nextLinkPageId);
+
+ long page = pageMem.acquirePage(grpId, nextLinkPageId);
+
+ try {
+ long pageAddr = pageMem.readLock(grpId, nextLinkPageId, page);
+
+ try {
+ DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+
+ int itemId = itemId(nextLink);
+
+ DataPagePayload data = io.readPayload(pageAddr, itemId, pageMem.realPageSize(grpId));
+
+ nextLink = data.nextLink();
+ nextLinkPageId = pageId(nextLink);
+ }
+ finally {
+ pageMem.readUnlock(grpId, nextLinkPageId, page);
+ }
+ }
+ finally {
+ pageMem.releasePage(grpId, nextLinkPageId, page);
+ }
+ }
+
+ pageIds.add(pageId);
+
+ return pageIds.array();
+ }
+
+ /**
* @return {@code True} if entry is ready.
*/
public boolean isReady() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index d5d93b7..03524e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
@@ -947,20 +948,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
releasePage(metaPageId, metaPage);
}
- long firstPage = acquirePage(firstPageId);
-
try {
- long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
+ long firstPage = acquirePage(firstPageId);
try {
- cursor.init(pageAddr, io(pageAddr), -1);
+ long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
+
+ try {
+ cursor.init(pageAddr, io(pageAddr), -1);
+ }
+ finally {
+ readUnlock(firstPageId, firstPage, pageAddr);
+ }
}
finally {
- readUnlock(firstPageId, firstPage, pageAddr);
+ releasePage(firstPageId, firstPage);
}
}
- finally {
- releasePage(firstPageId, firstPage);
+ catch (RuntimeException | AssertionError e) {
+ throw new BPlusTreeRuntimeException(e, grpId, metaPageId, firstPageId);
}
return cursor;
@@ -995,12 +1001,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
checkDestroyed();
+ ForwardCursor cursor = new ForwardCursor(lower, upper, c, x);
+
try {
if (lower == null)
return findLowerUnbounded(upper, c, x);
- ForwardCursor cursor = new ForwardCursor(lower, upper, c, x);
-
cursor.find();
return cursor;
@@ -1009,7 +1015,15 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ long[] pageIds = pages(
+ lower == null || cursor == null || cursor.getCursor == null,
+ () -> new long[]{cursor.getCursor.pageId}
+ );
+
+ throw corruptedTreeException(
+ formatMsg("Runtime failure on bounds: [lower=%s, upper=%s]", lower, upper),
+ e, grpId, pageIds
+ );
}
finally {
checkDestroyed();
@@ -1025,19 +1039,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException {
checkDestroyed();
- try {
- ClosureCursor cursor = new ClosureCursor(lower, upper, c);
+ ClosureCursor cursor = new ClosureCursor(lower, upper, c);
+ try {
cursor.iterate();
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
}
- catch (RuntimeException e) {
- throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
- }
- catch (AssertionError e) {
- throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ catch (RuntimeException | AssertionError e) {
+ throw corruptedTreeException(
+ "Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]",
+ e, grpId, pages(cursor.getCursor != null, () -> new long[]{cursor.getCursor.pageId})
+ );
}
finally {
checkDestroyed();
@@ -1084,9 +1098,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
public T findFirst(TreeRowClosure<L, T> filter) throws IgniteCheckedException {
checkDestroyed();
+ long curPageId = 0L;
+ long nextPageId = 0L;
+
try {
for (;;) {
- long curPageId;
long metaPage = acquirePage(metaPageId);
@@ -1117,7 +1133,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return getRow(io, curPageAddr, i);
}
- long nextPageId = io.getForward(curPageAddr);
+ nextPageId = io.getForward(curPageAddr);
if (nextPageId == 0)
return null;
@@ -1174,7 +1190,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on first row lookup", e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on first row lookup", e);
+ throw corruptedTreeException("Runtime failure on first row lookup", e, grpId, curPageId, nextPageId);
}
finally {
checkDestroyed();
@@ -1196,20 +1212,33 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
public T findLast(final TreeRowClosure<L, T> c) throws IgniteCheckedException {
checkDestroyed();
+ Get g = null;
+
try {
if (c == null) {
- GetOne g = new GetOne(null, null, null, true);
+ g = new GetOne(null, null, null, true);
+
doFind(g);
return (T)g.row;
- } else
- return new GetLast(c).find();
+ }
+ else {
+ GetLast gLast = new GetLast(c);
+
+ g = gLast;
+
+ return gLast.find();
+ }
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Runtime failure on last row lookup", e);
}
catch (RuntimeException | AssertionError e) {
- throw new IgniteException("Runtime failure on last row lookup", e);
+ Get g0 = g;
+
+ long[] pageIds = pages(g == null, () -> new long[]{g0.pageId});
+
+ throw corruptedTreeException("Runtime failure on last row lookup", e, grpId, pageIds);
}
finally {
checkDestroyed();
@@ -1236,9 +1265,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
public final <R> R findOne(L row, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
checkDestroyed();
- try {
- GetOne g = new GetOne(row, c, x, false);
+ GetOne g = new GetOne(row, c, x, false);
+ try {
doFind(g);
return (R)g.row;
@@ -1247,7 +1276,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on lookup row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on lookup row: " + row, e);
+ throw corruptedTreeException(formatMsg("Runtime failure on lookup row: %s", row), e, grpId, g.pageId);
}
finally {
checkDestroyed();
@@ -1808,7 +1837,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+ throw corruptedTreeException(formatMsg("Runtime failure on search row: %s", row), e, grpId, x.pageId);
}
finally {
x.releaseAll();
@@ -1965,7 +1994,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+ throw corruptedTreeException(formatMsg("Runtime failure on search row: %s", row), e, grpId, r.pageId);
}
finally {
r.releaseAll();
@@ -2317,7 +2346,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on row: " + row, e);
+ throw corruptedTreeException(formatMsg("Runtime failure on row: %s", row), e, grpId, p.pageId);
}
finally {
checkDestroyed();
@@ -5182,6 +5211,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/** */
final L upperBound;
+ /** Cached value for retrieving diagnosting info in case of failure. */
+ public GetCursor getCursor;
+
/**
* @param lowerBound Lower bound.
* @param upperBound Upper bound.
@@ -5321,7 +5353,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
final void find() throws IgniteCheckedException {
assert lowerBound != null;
- doFind(new GetCursor(lowerBound, lowerShift, this));
+ doFind(getCursor = new GetCursor(lowerBound, lowerShift, this));
}
/**
@@ -5373,6 +5405,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
readUnlock(pageId, page, pageAddr);
}
}
+ catch (RuntimeException | AssertionError e) {
+ throw corruptedTreeException("Runtime failure on cursor iteration", e, grpId, pageId);
+ }
finally {
releasePage(pageId, page);
}
@@ -5789,4 +5824,50 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
protected int getLockRetries() {
return LOCK_RETRIES;
}
+
+ /**
+ * PageIds converter with empty check.
+ *
+ * @param empty Flag for empty array result.
+ * @param pages Pages supplier.
+ * @return Array of page ids.
+ */
+ private long[] pages(boolean empty, Supplier<long[]> pages) {
+ return empty ? GridLongList.EMPTY_ARRAY : pages.get();
+ }
+
+ /**
+ * Construct the exception and invoke failure processor.
+ *
+ * @param msg Message.
+ * @param cause Cause.
+ * @param grpId Group id.
+ * @param pageIds Pages ids.
+ * @return New CorruptedTreeException instance.
+ */
+ private CorruptedTreeException corruptedTreeException(String msg, Throwable cause, int grpId, long... pageIds) {
+ CorruptedTreeException e = new CorruptedTreeException(msg, cause, grpId, pageIds);
+
+ if (failureProcessor != null)
+ failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ return e;
+ }
+
+ /**
+ * Creates a formatted message even if "toString" of optioanl parameters failed.
+ *
+ * @param msg Detailed error message.
+ * @param rows Optional parameters.
+ * @return New instance of {@link CorruptedTreeException}.
+ */
+ private String formatMsg(String msg, Object... rows) {
+ try {
+ return String.format(msg, rows);
+ }
+ catch (Throwable ignored) {
+ // Failed to create string representation of optional parameters.
+ return msg + " <failed to create rows string representation>";
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTreeRuntimeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTreeRuntimeException.java
new file mode 100644
index 0000000..073e629
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTreeRuntimeException.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.tree;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * This exception indicates that there's something wrong with B+Tree data integrity. Additional info about corrupted
+ * pages is present in fields.
+ */
+public class BPlusTreeRuntimeException extends RuntimeException {
+ /** */
+ private static final long serialVersionUID = 0L;
+ /** Group id common for all potentially corrupted pages. */
+ private final int grpId;
+ /** Ids of potentially corrupted pages. */
+ private final long[] pageIds;
+
+ /** */
+ public BPlusTreeRuntimeException(Throwable cause, int grpId, long... pageIds) {
+ super(cause);
+
+ this.grpId = grpId;
+ this.pageIds = pageIds;
+ }
+
+ /** Pairs of (groupId, pageId). */
+ public List<T2<Integer, Long>> pages() {
+ List<T2<Integer, Long>> res = new ArrayList<>(pageIds.length);
+
+ for (long pageId : pageIds)
+ res.add(new T2<>(grpId, pageId));
+
+ return res;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/CorruptedTreeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/CorruptedTreeException.java
index 12a9f49..9c97a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/CorruptedTreeException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/CorruptedTreeException.java
@@ -16,10 +16,21 @@
package org.apache.ignite.internal.processors.cache.persistence.tree;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.processors.cache.persistence.CorruptedPersistenceException;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import static java.util.Arrays.asList;
+
/**
* Exception to distinguish {@link BPlusTree} tree broken invariants.
*/
@@ -27,11 +38,77 @@ public class CorruptedTreeException extends IgniteCheckedException implements Co
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private final T2<Integer, Long>[] pages;
+
+ /**
+ * @param msg Message.
+ * @param cause Cause.
+ * @param grpId Group id of potentially corrupted pages.
+ * @param pageIds Potentially corrupted pages.
+ */
+ public CorruptedTreeException(String msg, @Nullable Throwable cause, int grpId, long... pageIds) {
+ this(msg, cause, toPagesArray(grpId, pageIds));
+ }
+
/**
* @param msg Message.
* @param cause Cause.
+ * @param pages (groupId, pageId) pairs for pages that might be corrupted.
+ */
+ public CorruptedTreeException(String msg, @Nullable Throwable cause, T2<Integer, Long>... pages) {
+ super(getMsg(msg, pages), cause);
+
+ this.pages = expandPagesArray(pages, cause);
+ }
+
+ /** */
+ private static T2<Integer, Long>[] toPagesArray(int grpId, long[] pageIds) {
+ T2<Integer, Long>[] res = (T2<Integer, Long>[])new T2[pageIds.length];
+
+ for (int i = 0; i < pageIds.length; i++)
+ res[i] = new T2<>(grpId, pageIds[i]);
+
+ return res;
+ }
+
+ /** */
+ private static T2<Integer, Long>[] expandPagesArray(T2<Integer, Long>[] pages, Throwable cause) {
+ Set<T2<Integer, Long>> res = new HashSet<>(asList(pages));
+
+ BPlusTreeRuntimeException treeRuntimeException = X.cause(cause, BPlusTreeRuntimeException.class);
+
+ // Add root exception pages ids if we have.
+ if (treeRuntimeException != null)
+ res.addAll(treeRuntimeException.pages());
+
+ Set<T2<Integer, Long>> partMetaPages = res.stream().map(page -> {
+ int grpId = page.get1();
+ int partId = PageIdUtils.partId(page.get2());
+
+ final long partMetaPageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, 0);
+
+ return new T2<>(grpId, partMetaPageId);
+ }).collect(Collectors.toSet());
+
+ // Add meta pages for all (group,partition) pairs.
+ res.addAll(partMetaPages);
+
+ return (T2<Integer, Long>[])res.toArray(new T2[0]);
+ }
+
+ /** */
+ private static String getMsg(String msg, T2<Integer, Long>... pages) {
+ return S.toString("B+Tree is corrupted",
+ "pages(groupId, pageId)", Arrays.toString(pages), false,
+ "msg", msg, false
+ );
+ }
+
+ /**
+ * @return (groupId, pageId) pairs for pages that might be corrupted.
*/
- public CorruptedTreeException(String msg, @Nullable Throwable cause) {
- super(msg, cause);
+ public T2<Integer, Long>[] pages() {
+ return pages;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 23c6124..ab6dc2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -482,6 +482,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
+ * @return Info about of WAL paths.
+ */
+ public SegmentRouter getSegmentRouter() {
+ return segmentRouter;
+ }
+
+ /**
*
*/
private void startArchiverAndCompressor() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
index 322705f..f62cca6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
@@ -88,6 +88,27 @@ public class SegmentRouter {
}
/**
+ * @return {@code true} If archive folder exists.
+ */
+ public boolean hasArchive() {
+ return !walWorkDir.getAbsolutePath().equals(walArchiveDir.getAbsolutePath());
+ }
+
+ /**
+ * @return WAL working directory.
+ */
+ public File getWalWorkDir() {
+ return walWorkDir;
+ }
+
+ /**
+ * @return WAL archive directory.
+ */
+ public File getWalArchiveDir() {
+ return walArchiveDir;
+ }
+
+ /**
* Returns {@code true} if archiver is enabled.
*/
private boolean isArchiverEnabled() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIterator.java
new file mode 100644
index 0000000..1cd2999
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Decorator of {@link WALIterator} which allow filter record by {@link WALPointer} and {@link WALRecord}.
+ */
+public class FilteredWalIterator extends GridIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
+ implements WALIterator {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Source WAL iterator which provide data for filtering. */
+ private final WALIterator delegateWalIter;
+
+ /** Filter for filtering iterated data. */
+ private final Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter;
+
+ /** Next record in iterator for supporting iterator pattern. */
+ private IgniteBiTuple<WALPointer, WALRecord> next;
+
+ /**
+ * @param walIterator Source WAL iterator which provide data for filtering.
+ * @param filter Filter for filtering iterated data.
+ */
+ public FilteredWalIterator(WALIterator walIterator,
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter) throws IgniteCheckedException {
+ this.filter = filter == null ? (r) -> true : filter;
+ this.delegateWalIter = walIterator;
+
+ // Initiate iterator by first record.
+ next = nextFilteredRecord();
+ }
+
+ /** {@inheritDoc} **/
+ @Override public Optional<WALPointer> lastRead() {
+ return Optional.ofNullable(next == null ? null : next.get1());
+ }
+
+ /**
+ * @return Next filtered record.
+ */
+ private IgniteBiTuple<WALPointer, WALRecord> nextFilteredRecord() {
+ while (delegateWalIter.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> next = delegateWalIter.next();
+
+ if (filter.test(next))
+ return next;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<WALPointer, WALRecord> nextX() throws IgniteCheckedException {
+ if (!hasNextX())
+ throw new NoSuchElementException();
+
+ IgniteBiTuple<WALPointer, WALRecord> cur = next;
+
+ next = nextFilteredRecord();
+
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeX() throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ delegateWalIter.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClosed() {
+ return delegateWalIter.isClosed();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index e3f1499..6dd565e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -172,7 +172,8 @@ public class IgniteWalIteratorFactory {
) throws IgniteCheckedException, IllegalArgumentException {
iteratorParametersBuilder.validate();
- return new StandaloneWalRecordsIterator(log,
+ return new StandaloneWalRecordsIterator(
+ iteratorParametersBuilder.log == null ? log : iteratorParametersBuilder.log,
iteratorParametersBuilder.sharedCtx == null ? prepareSharedCtx(iteratorParametersBuilder) :
iteratorParametersBuilder.sharedCtx,
iteratorParametersBuilder.ioFactory,
@@ -223,9 +224,18 @@ public class IgniteWalIteratorFactory {
) throws IllegalArgumentException {
iteratorParametersBuilder.validate();
- List<T2<Long, Long>> gaps = new ArrayList<>();
+ return hasGaps(resolveWalFiles(iteratorParametersBuilder));
+ }
- List<FileDescriptor> descriptors = resolveWalFiles(iteratorParametersBuilder);
+ /**
+ * @param descriptors File descriptors.
+ * @return List of tuples, low and high index segments with gap.
+ */
+ public List<T2<Long, Long>> hasGaps(
+ @NotNull List<FileDescriptor> descriptors
+ ) throws IllegalArgumentException {
+
+ List<T2<Long, Long>> gaps = new ArrayList<>();
Iterator<FileDescriptor> it = descriptors.iterator();
@@ -376,6 +386,8 @@ public class IgniteWalIteratorFactory {
* Wal iterator parameter builder.
*/
public static class IteratorParametersBuilder {
+ /** Logger. */
+ private IgniteLogger log;
/** */
public static final FileWALPointer DFLT_LOW_BOUND = new FileWALPointer(Long.MIN_VALUE, 0, 0);
@@ -431,6 +443,25 @@ public class IgniteWalIteratorFactory {
private boolean strictBoundsCheck;
/**
+ * Factory method for {@link IgniteWalIteratorFactory.IteratorParametersBuilder}.
+ *
+ * @return Instance of {@link IgniteWalIteratorFactory.IteratorParametersBuilder}.
+ */
+ public static IteratorParametersBuilder withIteratorParameters() {
+ return new IteratorParametersBuilder();
+ }
+
+ /**
+ * @param log Logger.
+ * @return IteratorParametersBuilder Self reference.
+ */
+ public IteratorParametersBuilder log(IgniteLogger log){
+ this.log = log;
+
+ return this;
+ }
+
+ /**
* @param filesOrDirs Paths to files or directories.
* @return IteratorParametersBuilder Self reference.
*/
@@ -537,6 +568,16 @@ public class IgniteWalIteratorFactory {
}
/**
+ * @param filter Record filter for skip records during iteration.
+ * @return IteratorParametersBuilder Self reference.
+ */
+ public IteratorParametersBuilder addFilter(IgniteBiPredicate<RecordType, WALPointer> filter) {
+ this.filter = this.filter == null ? filter : this.filter.and(filter);
+
+ return this;
+ }
+
+ /**
* @param lowBound WAL pointer to start from.
* @return IteratorParametersBuilder Self reference.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 4efa337..c2ec57a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
@@ -676,4 +677,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@Override public LongJVMPauseDetector longJvmPauseDetector() {
return new LongJVMPauseDetector(log);
}
+
+ /** {@inheritDoc} */
+ @Override public DiagnosticProcessor diagnostic() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/WalFilters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/WalFilters.java
new file mode 100644
index 0000000..3df54e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/WalFilters.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Class for holding only very basic WAL filters for using in {@link FilteredWalIterator}. *
+ */
+public class WalFilters {
+ /**
+ * Filtering all checkpoint records.
+ *
+ * @return Predicate for filtering checkpoint records.
+ */
+ public static Predicate<IgniteBiTuple<WALPointer, WALRecord>> checkpoint() {
+ return record -> record.get2() instanceof CheckpointRecord;
+ }
+
+ /**
+ * Filtering all records whose pageId is contained in pageOwnerIds.
+ *
+ * @param pageOwnerIds Page id for filtering.
+ * @return Predicate for filtering record from pageOwnerIds.
+ */
+ public static Predicate<IgniteBiTuple<WALPointer, WALRecord>> pageOwner(Set<T2<Integer, Long>> pageOwnerIds) {
+ return record -> {
+ WALRecord walRecord = record.get2();
+
+ if (walRecord instanceof PageDeltaRecord) {
+ PageDeltaRecord rec0 = (PageDeltaRecord)walRecord;
+
+ return pageOwnerIds.contains(new T2<>(rec0.groupId(), rec0.pageId()));
+ }
+ else if (walRecord instanceof PageSnapshot) {
+ PageSnapshot rec0 = (PageSnapshot)walRecord;
+
+ return pageOwnerIds.contains(new T2<>(rec0.groupId(), rec0.fullPageId().pageId()));
+ }
+
+ return false;
+ };
+ }
+
+ /**
+ * Filtering all records whose partitionId is contained in partsMetaupdate.
+ *
+ * @param partsMetaupdate Partition id for filtering.
+ * @return Predicate for filtering record from pageOwnerIds.
+ */
+ public static Predicate<IgniteBiTuple<WALPointer, WALRecord>> partitionMetaStateUpdate(
+ Set<T2<Integer, Integer>> partsMetaupdate
+ ) {
+ return record -> {
+ WALRecord walRecord = record.get2();
+
+ if (walRecord instanceof PartitionMetaStateRecord) {
+ PartitionMetaStateRecord rec0 = (PartitionMetaStateRecord)walRecord;
+
+ return partsMetaupdate.contains(new T2<>(rec0.groupId(), rec0.partitionId()));
+ }
+
+ return false;
+ };
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintRawToFileHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintRawToFileHandler.java
new file mode 100644
index 0000000..7e968f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintRawToFileHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Handler to print raw pages data into file for further diagnostic.
+ */
+public class PrintRawToFileHandler extends PrintToFileHandler {
+ /** */
+ private final RecordSerializer serializer;
+
+ /**
+ * @param file Output file.
+ * @param serializer Serializer for WAL records.
+ */
+ public PrintRawToFileHandler(File file, RecordSerializer serializer) {
+ super(file, null);
+
+ this.serializer = serializer;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte[] getBytes(IgniteBiTuple<WALPointer, WALRecord> record) {
+ try {
+ WALRecord walRec = record.get2();
+
+ ByteBuffer buf = ByteBuffer.allocate(serializer.size(walRec));
+
+ serializer.writeRecord(walRec, buf);
+
+ return buf.array();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte[] getHeader() {
+ ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ FileWriteAheadLogManager.prepareSerializerVersionBuffer(0L, serializer.version(), false, buf);
+
+ return buf.array();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToFileHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToFileHandler.java
new file mode 100644
index 0000000..4693700
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToFileHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.DEFAULT_WAL_RECORD_PREFIX;
+
+/**
+ * Handler which print record to file.
+ *
+ * This is not thread safe. Can be used only one time.
+ */
+class PrintToFileHandler implements ScannerHandler {
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /** Target file. */
+ private final File file;
+
+ /** Open file to write. */
+ private FileIO fileToWrite;
+
+ /**
+ * @param file File to write.
+ * @param ioFactory File IO factory.
+ */
+ public PrintToFileHandler(File file, FileIOFactory ioFactory) {
+ this.file = file;
+ this.ioFactory = ioFactory != null ? ioFactory : new DataStorageConfiguration().getFileIOFactory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void handle(IgniteBiTuple<WALPointer, WALRecord> record) {
+ initIfRequired();
+
+ writeFully(getBytes(record));
+ }
+
+ /**
+ * @param record WAL record with its pointer.
+ * @return Bytes repersentation of data to be written in dump file.
+ */
+ protected byte[] getBytes(IgniteBiTuple<WALPointer, WALRecord> record) {
+ return (DEFAULT_WAL_RECORD_PREFIX + record.get2() + "\n").getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * @return Optional header for the diagnostic file. {@code null} if there should be no header.
+ */
+ @Nullable protected byte[] getHeader() {
+ return null;
+ }
+
+ /**
+ * Initialize fileToWrite if it required.
+ */
+ private void initIfRequired() {
+ if (fileToWrite == null) {
+ try {
+ fileToWrite = ioFactory.create(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ byte[] hdr = getHeader();
+
+ if (hdr != null)
+ writeFully(hdr);
+ }
+
+ /**
+ * Write byte array into file.
+ *
+ * @param bytes Data.
+ * @throws IgniteException If write failed.
+ */
+ private void writeFully(byte[] bytes) {
+ int written = 0;
+
+ try {
+ while ((written += fileToWrite.writeFully(bytes, written, bytes.length - written)) < bytes.length);
+ }
+ catch (IOException ex) {
+ throw new IgniteException(ex);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finish() {
+ if (fileToWrite == null)
+ return;
+
+ try {
+ try {
+ fileToWrite.force();
+ }
+ finally {
+ fileToWrite.close();
+ }
+ }
+ catch (IOException ex) {
+ throw new IgniteException(ex);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToLogHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToLogHandler.java
new file mode 100644
index 0000000..2d1af63
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/PrintToLogHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.DEFAULT_WAL_RECORD_PREFIX;
+
+/**
+ * Handler which print record to log.
+ *
+ * This is not thread safe. Can be used only one time.
+ */
+class PrintToLogHandler implements ScannerHandler {
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private StringBuilder resultString = new StringBuilder();
+
+ /**
+ * @param log Logger.
+ */
+ public PrintToLogHandler(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handle(IgniteBiTuple<WALPointer, WALRecord> record) {
+ ensureNotFinished();
+
+ resultString.append(DEFAULT_WAL_RECORD_PREFIX).append(record.get2()).append("\n");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finish() {
+ ensureNotFinished();
+
+ String msg = resultString.toString();
+
+ resultString = null;
+
+ log.info(msg);
+ }
+
+ /**
+ *
+ */
+ private void ensureNotFinished() {
+ if (resultString == null)
+ throw new IgniteException("This handler has been already finished.");
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandler.java
new file mode 100644
index 0000000..f88815c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Scanner handler which provide ability to do some handling on each record during iteration.
+ */
+public interface ScannerHandler {
+ /**
+ * Handling one more record during iteration over WAL.
+ *
+ * @param record One more record from WAL.
+ */
+ void handle(IgniteBiTuple<WALPointer, WALRecord> record);
+
+ /**
+ * Method which called after all iteration would be finished.
+ */
+ default void finish() {
+ }
+
+ /**
+ * Execute 'then' handler after 'this'.
+ *
+ * @param then Next handler for execution.
+ * @return Composite handler.
+ */
+ default ScannerHandler andThen(ScannerHandler then) {
+ ScannerHandler thiz = this;
+
+ return new ScannerHandler() {
+ @Override public void handle(IgniteBiTuple<WALPointer, WALRecord> record) {
+ try {
+ thiz.handle(record);
+ }
+ finally {
+ then.handle(record);
+ }
+ }
+
+ @Override public void finish() {
+ try {
+ thiz.finish();
+ }
+ finally {
+ then.finish();
+ }
+ }
+ };
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandlers.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandlers.java
new file mode 100644
index 0000000..0d69fa4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/ScannerHandlers.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import java.io.File;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+
+/**
+ * Holder of {@link ScannerHandlers}.
+ */
+public class ScannerHandlers {
+ /** */
+ public static final String DEFAULT_WAL_RECORD_PREFIX = "Next WAL record :: ";
+
+ /**
+ * @param log Logger.
+ * @return Handler which write record to log.
+ */
+ public static ScannerHandler printToLog(IgniteLogger log) {
+ return new PrintToLogHandler(log);
+ }
+
+ /**
+ * @param file File to write.
+ * @return Handler which write record to file.
+ */
+ public static ScannerHandler printToFile(File file) {
+ return new PrintToFileHandler(file, null);
+ }
+
+ /**
+ * @param file File to write.
+ * @param ioFactory IO factory.
+ * @return Handler which write record to file.
+ */
+ public static ScannerHandler printToFile(File file, FileIOFactory ioFactory) {
+ return new PrintToFileHandler(file, ioFactory);
+ }
+
+ /**
+ * @param file File to write.
+ * @param serializer WAL records serializer.
+ * @return Handler which write record to file.
+ */
+ public static ScannerHandler printRawToFile(File file, RecordSerializer serializer) {
+ return new PrintRawToFileHandler(file, serializer);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScanner.java
new file mode 100644
index 0000000..fe133a0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScanner.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.FilteredWalIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.MIXED;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.PHYSICAL;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.checkpoint;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.pageOwner;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.partitionMetaStateUpdate;
+
+/**
+ * Scanning WAL by specific condition.
+ */
+public class WalScanner {
+ /** Parameters for iterator. */
+ private final IteratorParametersBuilder parametersBuilder;
+ /** Wal iterator factory. */
+ private final IgniteWalIteratorFactory iteratorFactory;
+
+ /**
+ * @param parametersBuilder Parameters for iterator.
+ * @param factory Factory of iterator.
+ */
+ WalScanner(
+ IteratorParametersBuilder parametersBuilder,
+ IgniteWalIteratorFactory factory
+ ) {
+ this.parametersBuilder = parametersBuilder;
+ iteratorFactory = factory == null ? new IgniteWalIteratorFactory() : factory;
+ }
+
+ /**
+ * Finding all page physical records whose pageId is contained in given collection.
+ *
+ * @param groupAndPageIds Search pages.
+ * @return Final step for execution some action on result.
+ */
+ @NotNull public WalScanner.ScanTerminateStep findAllRecordsFor(
+ @NotNull Collection<T2<Integer, Long>> groupAndPageIds
+ ) {
+ requireNonNull(groupAndPageIds);
+
+ HashSet<T2<Integer, Long>> groupAndPageIds0 = new HashSet<>(groupAndPageIds);
+
+ // Collect all (group, partition) partition pairs.
+ Set<T2<Integer, Integer>> groupAndParts = groupAndPageIds0.stream()
+ .map((tup) -> new T2<>(tup.get1(), PageIdUtils.partId(tup.get2())))
+ .collect(Collectors.toSet());
+
+ // Build WAL filter. (Checkoint, Page, Partition meta)
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter = checkpoint()
+ .or(pageOwner(groupAndPageIds0))
+ .or(partitionMetaStateUpdate(groupAndParts));
+
+ return new ScanTerminateStep(() -> iterator(filter,
+ parametersBuilder.copy().addFilter((type, pointer) ->
+ // PHYSICAL need fo page shanpshot or delta record.
+ // MIXED need for partiton meta state update.
+ type.purpose() == PHYSICAL || type.purpose() == MIXED
+ )
+ ));
+ }
+
+ /**
+ * @param filter Record filter.
+ * @param parametersBuilder Iterator parameters for customization.
+ * @return Instance of {@link FilteredWalIterator}.
+ * @throws IgniteCheckedException If initialization of iterator will be failed.
+ */
+ @NotNull private FilteredWalIterator iterator(
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter,
+ IteratorParametersBuilder parametersBuilder
+ ) throws IgniteCheckedException {
+ return new FilteredWalIterator(iteratorFactory.iterator(parametersBuilder), filter);
+ }
+
+ /**
+ * Factory method of {@link WalScanner}.
+ *
+ * @param parametersBuilder Iterator parameters for customization.
+ * @return Instance of {@link WalScanner}.
+ */
+ public static WalScanner buildWalScanner(IteratorParametersBuilder parametersBuilder) {
+ return new WalScanner(parametersBuilder, null);
+ }
+
+ /**
+ * Factory method of {@link WalScanner}.
+ *
+ * @param parametersBuilder Iterator parameters for customization.
+ * @param factory Custom instance of {@link IgniteWalIteratorFactory}.
+ * @return Instance of {@link WalScanner}.
+ */
+ public static WalScanner buildWalScanner(
+ IteratorParametersBuilder parametersBuilder,
+ IgniteWalIteratorFactory factory
+ ) {
+ return new WalScanner(parametersBuilder, factory);
+ }
+
+ /**
+ * Terminate state of scanning of WAL for ability to do chaining flow.
+ */
+ public static class ScanTerminateStep {
+ /** WAL iteration supplier. */
+ final IgniteThrowableSupplier<WALIterator> iterSupplier;
+
+ /**
+ * @param iterSupplier WAL iteration supplier.
+ */
+ private ScanTerminateStep(IgniteThrowableSupplier<WALIterator> iterSupplier) {
+ this.iterSupplier = iterSupplier;
+ }
+
+ /**
+ * Execute given handler on each record.
+ *
+ * @param handler Single record handler.
+ * @throws IgniteCheckedException If iteration was failed.
+ */
+ public void forEach(@NotNull ScannerHandler handler) throws IgniteCheckedException {
+ try (WALIterator it = iterSupplier.get()) {
+ while (it.hasNext())
+ handler.handle(it.next());
+ }
+ finally {
+ handler.finish();
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessor.java
new file mode 100644
index 0000000..36cb248
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessor.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.diagnostic;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_FILE;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_LOG;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_RAW_FILE;
+import static org.apache.ignite.internal.util.IgniteStopwatch.logTime;
+
+/**
+ * Processor which contained helper methods for different diagnostic cases.
+ */
+public class DiagnosticProcessor extends GridProcessorAdapter {
+ /** Time formatter for dump file name. */
+ private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH-mm-ss_SSS");
+ /** Folder name for store diagnostic info. **/
+ static final String DEFAULT_TARGET_FOLDER = "diagnostic";
+ /** File format. */
+ static final String FILE_FORMAT = ".txt";
+ /** Raw file format. */
+ static final String RAW_FILE_FORMAT = ".raw";
+ /** Full path for store dubug info. */
+ private final Path diagnosticPath;
+
+ private final PageHistoryDiagnoster pageHistoryDiagnoster;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public DiagnosticProcessor(GridKernalContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ diagnosticPath = U.resolveWorkDirectory(ctx.config().getWorkDirectory(), DEFAULT_TARGET_FOLDER, false).toPath();
+
+ pageHistoryDiagnoster = new PageHistoryDiagnoster(ctx, this::diagnosticFile);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ super.onKernalStart(active);
+
+ pageHistoryDiagnoster.onStart();
+ }
+
+ /**
+ * Dump all history caches of given page.
+ *
+ * @param builder Parameters of dumping.
+ * @throws IgniteCheckedException If scanning was failed.
+ */
+ public void dumpPageHistory(
+ @NotNull PageHistoryDiagnoster.DiagnosticPageBuilder builder
+ ) throws IgniteCheckedException {
+ logTime(log, "DiagnosticPageHistory", () -> pageHistoryDiagnoster.dumpPageHistory(builder));
+ }
+
+ /**
+ * Print diagnostic info about failure occurred on {@code ignite} instance.
+ * Failure details is contained in {@code failureCtx}.
+ *
+ * @param ignite Ignite instance.
+ * @param failureCtx Failure context.
+ */
+ public void onFailure(Ignite ignite, FailureContext failureCtx) {
+ // If we have some corruption in data structure,
+ // we should scan WAL and print to log and save to file all pages related to corruption for
+ // future investigation.
+ if (X.hasCause(failureCtx.error(), CorruptedTreeException.class)) {
+ CorruptedTreeException corruptedTreeException = X.cause(failureCtx.error(), CorruptedTreeException.class);
+
+ T2<Integer, Long>[] pageIds = corruptedTreeException.pages();
+
+ try {
+ dumpPageHistory(
+ new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(pageIds)
+ .addAction(PRINT_TO_LOG)
+ .addAction(PRINT_TO_FILE)
+ .addAction(PRINT_TO_RAW_FILE)
+ );
+ }
+ catch (IgniteCheckedException e) {
+ SB sb = new SB();
+ sb.a("[");
+
+ for (int i = 0; i < pageIds.length; i++)
+ sb.a("(").a(pageIds[i].get1()).a(",").a(pageIds[i].get2()).a(")");
+
+ sb.a("]");
+
+ ignite.log().error(
+ "Failed to dump diagnostic info on tree corruption. PageIds=" + sb, e);
+ }
+ }
+ }
+
+ /**
+ * Resolve file to store diagnostic info.
+ *
+ * @param customFile Custom file if customized.
+ * @param writeMode Diagnostic file write mode.
+ * @return File to store diagnostic info.
+ */
+ private File diagnosticFile(File customFile, DiagnosticFileWriteMode writeMode) {
+ if (customFile == null)
+ return finalizeFile(diagnosticPath, writeMode);
+
+ if (customFile.isAbsolute())
+ return finalizeFile(customFile.toPath(), writeMode);
+
+ return finalizeFile(diagnosticPath.resolve(customFile.toPath()), writeMode);
+ }
+
+ /**
+ * @param diagnosticPath Path to diagnostic file.
+ * @param writeMode Diagnostic file write mode.
+ * @return File to store diagnostic info.
+ */
+ private static File finalizeFile(Path diagnosticPath, DiagnosticFileWriteMode writeMode) {
+ diagnosticPath.toFile().mkdirs();
+
+ return diagnosticPath.resolve(LocalDateTime.now().format(TIME_FORMATTER) + getFileExtension(writeMode)).toFile();
+ }
+
+ /**
+ * Get file format for given write mode.
+ *
+ * @param writeMode Diagnostic file write mode.
+ * @return File extention with dot.
+ */
+ private static String getFileExtension(DiagnosticFileWriteMode writeMode) {
+ switch (writeMode) {
+ case HUMAN_READABLE:
+ return FILE_FORMAT;
+
+ case RAW:
+ return RAW_FILE_FORMAT;
+
+ default:
+ throw new IllegalArgumentException("writeMode=" + writeMode);
+ }
+ }
+
+ /**
+ * Possible action after WAL scanning.
+ */
+ public enum DiagnosticAction {
+ /** Print result to log. */
+ PRINT_TO_LOG,
+ /** Print result to file. */
+ PRINT_TO_FILE,
+ /** Print result to file in raw format. */
+ PRINT_TO_RAW_FILE
+ }
+
+ /**
+ * Mode of diagnostic dump file.
+ */
+ public enum DiagnosticFileWriteMode {
+ /** Use humanly readable data representation. */
+ HUMAN_READABLE,
+ /** Use raw data format. */
+ RAW
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/PageHistoryDiagnoster.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/PageHistoryDiagnoster.java
new file mode 100644
index 0000000..3572fc8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diagnostic/PageHistoryDiagnoster.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.diagnostic;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentRouter;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandler;
+import org.apache.ignite.internal.processors.cache.persistence.wal.scanner.WalScanner.ScanTerminateStep;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticFileWriteMode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder.withIteratorParameters;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.checkpoint;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.pageOwner;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.WalFilters.partitionMetaStateUpdate;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.printToFile;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.printToLog;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.printRawToFile;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.WalScanner.buildWalScanner;
+
+/**
+ * Diagnostic WAL page history.
+ */
+public class PageHistoryDiagnoster {
+ /** Kernal context. */
+ @GridToStringExclude
+ protected final GridKernalContext ctx;
+ /** Diagnostic logger. */
+ @GridToStringExclude
+ protected final IgniteLogger log;
+
+ /** Wal folders to scan. */
+ private File[] walFolders;
+
+ /** Function to provide target end file to store diagnostic info. */
+ private final BiFunction<File, DiagnosticFileWriteMode, File> targetFileSupplier;
+
+ private final IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory();
+
+ /** */
+ private volatile FileWriteAheadLogManager wal;
+
+ /**
+ * @param ctx Kernal context.
+ * @param supplier Function to provide target end file to store diagnostic info.
+ */
+ public PageHistoryDiagnoster(GridKernalContext ctx, BiFunction<File, DiagnosticFileWriteMode, File> supplier) {
+ log = ctx.log(getClass());
+ this.ctx = ctx;
+ targetFileSupplier = supplier;
+ }
+
+ /**
+ * Do action on start.
+ */
+ public void onStart() {
+ FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ctx.cache().context().wal();
+
+ if (wal == null)
+ return;
+
+ this.wal = wal;
+
+ SegmentRouter segmentRouter = wal.getSegmentRouter();
+
+ if (segmentRouter.hasArchive())
+ walFolders = new File[] {segmentRouter.getWalArchiveDir(), segmentRouter.getWalWorkDir()};
+ else
+ walFolders = new File[] {segmentRouter.getWalWorkDir()};
+ }
+
+ /**
+ * Dump all history caches of given page.
+ *
+ * @param builder Parameters of dumping.
+ * @throws IgniteCheckedException If scanning was failed.
+ */
+ public void dumpPageHistory(
+ @NotNull PageHistoryDiagnoster.DiagnosticPageBuilder builder
+ ) throws IgniteCheckedException {
+ if (walFolders == null) {
+ log.info("Skipping dump page history due to WAL not configured");
+
+ return;
+ }
+
+ ScannerHandler action = null;
+
+ for (DiagnosticProcessor.DiagnosticAction act : builder.actions) {
+ if (action == null)
+ action = toHandler(act, builder.dumpFolder);
+ else
+ action = action.andThen(toHandler(act, builder.dumpFolder));
+ }
+
+ requireNonNull(action, "Should be configured at least one action");
+
+ IteratorParametersBuilder params = withIteratorParameters()
+ .log(log)
+ .filesOrDirs(walFolders);
+
+ // Resolve available WAL segment files.
+ List<FileDescriptor> descs = iteratorFactory.resolveWalFiles(params);
+
+ int descIdx = -1;
+ FileWALPointer reserved = null;
+
+ for (int i = 0; i < descs.size(); i++) {
+ // Try resever minimal available segment.
+ if (wal.reserve(reserved = new FileWALPointer(descs.get(i).idx(), 0, 0))) {
+ descIdx = i;
+
+ break;
+ }
+ }
+
+ if (descIdx == -1) {
+ log.info("Skipping dump page history due to can not reserve WAL segments: " + descToString(descs));
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Reserverd WAL segment idx: " + reserved.index());
+
+ // Check gaps in the reserved interval.
+ List<T2<Long, Long>> gaps = iteratorFactory.hasGaps(descs.subList(descIdx, descs.size()));
+
+ if (!gaps.isEmpty())
+ log.warning("Potentialy missed record because WAL has gaps: " + gapsToString(gaps));
+
+ try {
+ scan(builder, params, action, reserved);
+ }
+ finally {
+ assert reserved != null;
+
+ wal.release(reserved);
+
+ if (log.isDebugEnabled())
+ log.debug("Release WAL segment idx:" + reserved.index());
+ }
+ }
+
+ /**
+ * @param builder Diagnostic parameter builder.
+ * @param params Iterator parameter builder.
+ * @param action Action.
+ * @param from Pointer from replay.
+ */
+ private void scan(
+ PageHistoryDiagnoster.DiagnosticPageBuilder builder,
+ IteratorParametersBuilder params,
+ ScannerHandler action,
+ FileWALPointer from
+ ) throws IgniteCheckedException {
+ IgniteBiTuple<WALPointer, WALRecord> lastReadRec = null;
+ // Try scan via WAL manager. More safety way on working node.
+ try {
+ Set<T2<Integer, Long>> groupAndPageIds0 = new HashSet<>(builder.pageIds);
+
+ // Collect all (group, partition) partition pairs.
+ Set<T2<Integer, Integer>> groupAndParts = groupAndPageIds0.stream()
+ .map((tup) -> new T2<>(tup.get1(), PageIdUtils.partId(tup.get2())))
+ .collect(Collectors.toSet());
+
+ // Build WAL filter. (Checkoint, Page, Partition meta)
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter = checkpoint()
+ .or(pageOwner(groupAndPageIds0))
+ .or(partitionMetaStateUpdate(groupAndParts));
+
+ try (WALIterator it = wal.replay(from)) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> recTup = lastReadRec = it.next();
+
+ if (filter.test(recTup))
+ action.handle(recTup);
+ }
+ }
+ finally {
+ action.finish();
+ }
+
+ return;
+
+ }
+ catch (IgniteCheckedException e) {
+ if (lastReadRec != null) {
+ log.warning("Failed to diagnosric scan via WAL manager, lastReadRec:["
+ + lastReadRec.get1() + ", " + lastReadRec.get2() + "]",e);
+ }
+ else
+ log.warning("Failed to diagnosric scan via WAL manager", e);
+ }
+
+ // Try scan via stand alone iterator is not safety if wal still generated and moving to archive.
+ // Build scanner for pageIds from reserved pointer.
+ ScanTerminateStep scanner = buildWalScanner(params.from(from)).findAllRecordsFor(builder.pageIds);
+
+ scanner.forEach(action);
+ }
+
+ /**
+ * @param descs WAL file descriptors.
+ * @return String representation.
+ */
+ private String descToString(List<FileDescriptor> descs) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("[");
+
+ Iterator<FileDescriptor> iter = descs.iterator();
+
+ while (iter.hasNext()) {
+ FileDescriptor desc = iter.next();
+
+ sb.append(desc.idx());
+
+ if (!iter.hasNext())
+ sb.append(", ");
+ }
+
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ /**
+ * @param gaps WAL file gaps.
+ * @return String representation.
+ */
+ private String gapsToString(Collection<T2<Long, Long>> gaps) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("[");
+
+ Iterator<T2<Long, Long>> iter = gaps.iterator();
+
+ while (iter.hasNext()) {
+ T2<Long, Long> gap = iter.next();
+
+ sb.append("(").append(gap.get1()).append("..").append(gap.get2()).append(")");
+
+ if (!iter.hasNext())
+ sb.append(", ");
+ }
+
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ /**
+ * @param action Action for converting.
+ * @param customFile File to store diagnostic info.
+ * @return {@link ScannerHandler} for handle records.
+ */
+ private ScannerHandler toHandler(DiagnosticProcessor.DiagnosticAction action, File customFile) {
+ switch (action) {
+ case PRINT_TO_LOG:
+ return printToLog(log);
+
+ case PRINT_TO_FILE:
+ return printToFile(targetFileSupplier.apply(customFile, DiagnosticFileWriteMode.HUMAN_READABLE));
+
+ case PRINT_TO_RAW_FILE:
+ return printRawToFile(targetFileSupplier.apply(customFile, DiagnosticFileWriteMode.RAW), serializer());
+
+ default:
+ throw new IllegalArgumentException("Unknown diagnostic action : " + action);
+ }
+ }
+
+ /**
+ * @return WAL records serializer.
+ * @throws IgniteException If serializer initialization failed for some reason.
+ */
+ private RecordSerializer serializer() {
+ GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
+
+ int serializerVer = cctx.wal().serializerVersion();
+
+ try {
+ return new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
+ }
+ catch (IgniteCheckedException e) {
+ log.error(
+ "Failed to create WAL records serializer for diagnostic purposes [serializerVer=" + serializerVer + "]"
+ );
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Parameters for diagnostic pages.
+ */
+ public static class DiagnosticPageBuilder {
+ /** Pages for searching in WAL. */
+ List<T2<Integer, Long>> pageIds = new ArrayList<>();
+ /** Action after which should be executed after WAL scanning . */
+ Set<DiagnosticProcessor.DiagnosticAction> actions = EnumSet.noneOf(DiagnosticProcessor.DiagnosticAction.class);
+ /** Folder for dump diagnostic info. */
+ File dumpFolder;
+
+ /**
+ * @param pageIds Pages for searching in WAL.
+ * @return This instance for chaining.
+ */
+ public DiagnosticPageBuilder pageIds(T2<Integer, Long>... pageIds) {
+ this.pageIds.addAll(Arrays.asList(pageIds));
+
+ return this;
+ }
+
+ /**
+ * @param action Action after which should be executed after WAL scanning .
+ * @return This instance for chaining.
+ */
+ public DiagnosticPageBuilder addAction(@NotNull DiagnosticProcessor.DiagnosticAction action) {
+ this.actions.add(action);
+
+ return this;
+ }
+
+ /**
+ * @param file Folder for dump diagnostic info.
+ * @return This instance for chaining.
+ */
+ public DiagnosticPageBuilder folderForDump(@NotNull File file) {
+ this.dumpFolder = file;
+
+ return this;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
index 866351c..1fb55e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
@@ -28,6 +28,7 @@ import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CorruptedPersistenceException;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -142,6 +143,11 @@ public class FailureProcessor extends GridProcessorAdapter {
if (IGNITE_DUMP_THREADS_ON_FAILURE)
U.dumpThreads(log);
+ DiagnosticProcessor diagnosticProcessor = ctx.diagnostic();
+
+ if (diagnosticProcessor != null)
+ diagnosticProcessor.onFailure(ignite, failureCtx);
+
boolean invalidated = hnd.onFailure(ignite, failureCtx);
if (invalidated) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
index 0cad3c4..51ef932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStopwatch.java
@@ -22,6 +22,9 @@ package org.apache.ignite.internal.util;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
import org.jetbrains.annotations.NotNull;
import static java.util.concurrent.TimeUnit.DAYS;
@@ -117,6 +120,36 @@ public final class IgniteStopwatch {
}
/**
+ * Execution given operation and calculation it time.
+ *
+ * @param log Logger fol logging.
+ * @param operationName Operation name for logging.
+ * @param operation Operation for execution.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static void logTime(
+ IgniteLogger log,
+ String operationName,
+ IgniteThrowableRunner operation
+ ) throws IgniteCheckedException {
+ long start = System.currentTimeMillis();
+
+ log.info("Operation was started: operation = " + operationName);
+ try {
+ operation.run();
+ }
+ catch (Throwable ex) {
+ log.info("Operation was failed: operation = " + operationName
+ + ", elapsedTime = " + (System.currentTimeMillis() - start) + " ms");
+
+ throw ex;
+ }
+
+ log.info("Operation was success: operation = " + operationName
+ + ", elapsedTime = " + (System.currentTimeMillis() - start) + " ms");
+ }
+
+ /**
* Default constructor.
*/
IgniteStopwatch() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
index b151841..40584af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
@@ -22,15 +22,17 @@ import org.apache.ignite.IgniteCheckedException;
/**
* Represents an operation that accepts a single input argument and returns no result. Unlike most other functional
* interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects.
- * Also it is able to throw {@link IgniteCheckedException} unlike {@link java.util.function.Function}.
*
- * @param <E> Type of closure parameter.
+ * Also it is able to throw {@link IgniteCheckedException} unlike {@link java.util.function.Consumer}.
+ *
+ * @param <E> The type of the input to the operation.
*/
+@FunctionalInterface
public interface IgniteThrowableConsumer<E> extends Serializable {
/**
- * Consumer body.
+ * Performs this operation on the given argument.
*
- * @param e Consumer parameter.
+ * @param e the input argument
* @throws IgniteCheckedException If body execution was failed.
*/
public void accept(E e) throws IgniteCheckedException;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableSupplier.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableSupplier.java
index b151841..36b8ce0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableSupplier.java
@@ -1,12 +1,12 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
+ *
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,18 +20,20 @@ import java.io.Serializable;
import org.apache.ignite.IgniteCheckedException;
/**
- * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional
- * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects.
- * Also it is able to throw {@link IgniteCheckedException} unlike {@link java.util.function.Function}.
+ * Represents a supplier of results. There is no requirement that a new or distinct result be returned each
+ * time the supplier is invoked.
+ *
+ * Also it is able to throw {@link IgniteCheckedException} unlike {@link java.util.function.Supplier}.
*
- * @param <E> Type of closure parameter.
+ * @param <E> The type of results supplied by this supplier.
*/
-public interface IgniteThrowableConsumer<E> extends Serializable {
+@FunctionalInterface
+public interface IgniteThrowableSupplier<E> extends Serializable {
/**
- * Consumer body.
+ * Gets a result.
*
- * @param e Consumer parameter.
- * @throws IgniteCheckedException If body execution was failed.
+ * @return a result
+ * @throws IgniteCheckedException If result calculation failed.
*/
- public void accept(E e) throws IgniteCheckedException;
+ public E get() throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiPredicate.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiPredicate.java
index cba385c..af76653 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiPredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiPredicate.java
@@ -17,6 +17,7 @@
package org.apache.ignite.lang;
import java.io.Serializable;
+import java.util.Objects;
/**
* Defines a predicate which accepts two parameters and returns {@code true} or {@code false}.
@@ -33,4 +34,26 @@ public interface IgniteBiPredicate<E1, E2> extends Serializable {
* @return Return value.
*/
public boolean apply(E1 e1, E2 e2);
-}
\ No newline at end of file
+
+ /**
+ * Returns a composed predicate that represents a short-circuiting logical
+ * AND of this predicate and another. When evaluating the composed
+ * predicate, if this predicate is {@code false}, then the {@code other}
+ * predicate is not evaluated.
+ *
+ * <p>Any exceptions thrown during evaluation of either predicate are relayed
+ * to the caller; if evaluation of this predicate throws an exception, the
+ * {@code other} predicate will not be evaluated.
+ *
+ * @param then a predicate that will be logically-ANDed with this
+ * predicate
+ * @return a composed predicate that represents the short-circuiting logical
+ * AND of this predicate and the {@code other} predicate
+ * @throws NullPointerException if other is null
+ */
+ default IgniteBiPredicate<E1, E2> and(IgniteBiPredicate<E1, E2> then) {
+ Objects.requireNonNull(then);
+
+ return (p1, p2) -> apply(p1, p2) && then.apply(p1, p2);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedTreeFailureHandlingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedTreeFailureHandlingTest.java
new file mode 100644
index 0000000..b25a618
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedTreeFailureHandlingTest.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.DataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** */
+public class CorruptedTreeFailureHandlingTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final int CACHE_ENTRIES = 10;
+
+ /** Partition file with corrupted page. */
+ private final AtomicReference<File> fileRef = new AtomicReference<>();
+
+ /** Link to corrupted page. */
+ private final AtomicLong linkRef = new AtomicLong();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
+
+ dataStorageConfiguration.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
+ dataStorageConfiguration.setFileIOFactory(new CollectLinkFileIOFactory());
+
+ cfg.setDataStorageConfiguration(dataStorageConfiguration);
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction().setPartitions(1))
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ );
+
+ return cfg;
+ }
+
+ /** */
+ @Before
+ public void before() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @After
+ public void after() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /** */
+ @Test
+ public void testCorruptedPage() throws Exception {
+ IgniteEx srv = startGrid(0);
+
+ File diagnosticDir = new File(srv.context().config().getWorkDirectory(), "diagnostic");
+
+ FileUtils.deleteDirectory(diagnosticDir);
+
+ srv.cluster().active(true);
+
+ IgniteCache<Integer, Integer> cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ int pageSize = srv.configuration().getDataStorageConfiguration().getPageSize();
+
+ int grpId = srv.context().cache().cacheGroups().stream().filter(
+ context -> context.cacheOrGroupName().equals(DEFAULT_CACHE_NAME)
+ ).findAny().orElseThrow(() -> new RuntimeException("Cache group not found")).groupId();
+
+ stopGrid(0, false);
+
+ // Node is stopped, we're ready to corrupt partition data.
+ long link = linkRef.get();
+ long pageId = PageIdUtils.pageId(link);
+ int itemId = PageIdUtils.itemId(link);
+
+ ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize);
+
+ OpenOption[] options = {StandardOpenOption.READ, StandardOpenOption.WRITE};
+ try (RandomAccessFileIO fileIO = new RandomAccessFileIO(fileRef.get(), options)) {
+ DataPageIO dataPageIO = DataPageIO.VERSIONS.latest();
+
+ long pageOff = pageSize + PageIdUtils.pageIndex(pageId) * pageSize;
+
+ // Read index page.
+ fileIO.position(pageOff);
+ fileIO.readFully(pageBuf);
+
+ long pageAddr = GridUnsafe.bufferAddress(pageBuf);
+
+ // Remove existing item from index page.
+ dataPageIO.removeRow(pageAddr, itemId, pageSize);
+
+ // Recalculate CRC.
+ PageIO.setCrc(pageAddr, 0);
+
+ pageBuf.rewind();
+ PageIO.setCrc(pageAddr, FastCrc.calcCrc(pageBuf, pageSize));
+
+ // Write it back.
+ pageBuf.rewind();
+ fileIO.position(pageOff);
+ fileIO.writeFully(pageBuf);
+ }
+
+ srv = startGrid(0);
+
+ // Add modified page to WAL so it won't be restored to previous (valid) state.
+ pageBuf.rewind();
+ ByteBuffer cpBuf = ByteBuffer.allocate(pageBuf.capacity());
+ cpBuf.put(pageBuf);
+
+ PageSnapshot pageSnapshot = new PageSnapshot(new FullPageId(pageId, grpId), cpBuf.array(), pageSize);
+
+ srv.context().cache().context().wal().log(pageSnapshot);
+
+ // Access cache.
+ cache = srv.cache(DEFAULT_CACHE_NAME);
+
+ try {
+ for (int i = 0; i < CACHE_ENTRIES; i++)
+ cache.get(i);
+
+ fail("Cache operations are expected to fail");
+ }
+ catch (Throwable e) {
+ assertTrue(X.hasCause(e, CorruptedTreeException.class));
+ }
+
+ assertTrue(GridTestUtils.waitForCondition(() -> G.allGrids().isEmpty(), 10_000L));
+
+ assertTrue(diagnosticDir.exists());
+ assertTrue(diagnosticDir.isDirectory());
+
+ File[] txtFiles = diagnosticDir.listFiles((dir, name) -> name.endsWith(".txt"));
+
+ assertTrue(txtFiles != null && txtFiles.length == 1);
+
+ File[] rawFiles = diagnosticDir.listFiles((dir, name) -> name.endsWith(".raw"));
+
+ assertTrue(rawFiles != null && rawFiles.length == 1);
+ }
+
+ /** */
+ private class CollectLinkFileIOFactory implements FileIOFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIO = delegateFactory.create(file, modes);
+
+ return new FileIODecorator(fileIO) {
+ @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+ int type = PageIO.getType(srcBuf);
+
+ AbstractDataLeafIO dataLeafIO = null;
+
+ if (type == PageIO.T_DATA_REF_LEAF)
+ dataLeafIO = DataLeafIO.VERSIONS.latest();
+
+ if (type == PageIO.T_DATA_REF_MVCC_LEAF)
+ dataLeafIO = MvccDataLeafIO.VERSIONS.latest();
+
+ if (dataLeafIO != null) {
+ long pageAddr = GridUnsafe.bufferAddress(srcBuf);
+
+ int itemIdx = dataLeafIO.getCount(pageAddr) - 1;
+
+ linkRef.set(dataLeafIO.getLink(pageAddr, itemIdx));
+
+ fileRef.set(file);
+ }
+
+ srcBuf.rewind();
+
+ return super.write(srcBuf, position);
+ }
+ };
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIteratorTest.java
new file mode 100644
index 0000000..0877661
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/FilteredWalIteratorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.PHYSICAL;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.EXCHANGE;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.HEADER_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGE_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PART_META_UPDATE_STATE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class FilteredWalIteratorTest {
+ /** Count of different records sequence per filter. */
+ private static final int ITERATORS_COUNT_PER_FILTER = 20;
+ /** Count of records on one iterator. */
+ private static final int RECORDS_COUNT_IN_ITERATOR = 30;
+ /** **/
+ private static Random random = new Random();
+ /** **/
+ private static FileWALPointer ZERO_POINTER = new FileWALPointer(0, 0, 0);
+ /** **/
+ private static IgniteBiTuple<WALPointer, WALRecord> TEST_RECORD = new IgniteBiTuple<>(
+ ZERO_POINTER, new MetastoreDataRecord("key", new byte[0])
+ );
+
+ /** Customized iterator for test. */
+ private WALIterator mockedIter;
+ /** Iterator filter for test. */
+ private Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter;
+ /** Expected result for iterator and filter. */
+ private List<IgniteBiTuple<WALPointer, WALRecord>> expRes;
+
+ /**
+ * @param nameOfCase Case name. It required only for printing test name.
+ * @param mockedIter Basic WAL iterator.
+ * @param filter Filter by which record should be filtered.
+ * @param expRes Expected record result.
+ */
+ public FilteredWalIteratorTest(
+ String nameOfCase,
+ WALIterator mockedIter,
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter,
+ List<IgniteBiTuple<WALPointer, WALRecord>> expRes) {
+ this.mockedIter = mockedIter;
+ this.filter = filter;
+ this.expRes = expRes;
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void shouldReturnCorrectlyFilteredRecords() throws IgniteCheckedException {
+ FilteredWalIterator filteredIter = new FilteredWalIterator(mockedIter, filter);
+
+ List<IgniteBiTuple<WALPointer, WALRecord>> ans = new ArrayList<>();
+ try (WALIterator it = filteredIter) {
+ while (it.hasNext())
+ ans.add(it.next());
+ }
+
+ assertNotNull(ans);
+ assertEquals(expRes, ans);
+ }
+
+ /**
+ * @return Datas for test.
+ */
+ @Parameterized.Parameters(name = "{0} case №{index}")
+ public static Iterable<Object[]> providedTestData() {
+ ArrayList<Object[]> res = new ArrayList<>();
+
+ res.addAll(prepareTestCaseData("PhysicalFilter", r -> r.get2().type().purpose() == PHYSICAL));
+ res.addAll(prepareTestCaseData("CheckpointFilter", r -> r.get2() instanceof CheckpointRecord));
+
+ return res;
+ }
+
+ /**
+ * Prepare bunch of data given filter.
+ *
+ * @param testCaseName Human readable name of filter.
+ * @param filter Filter for test.
+ * @return Prepared data.
+ */
+ private static List<Object[]> prepareTestCaseData(
+ String testCaseName,
+ Predicate<IgniteBiTuple<WALPointer, WALRecord>> filter
+ ) {
+ ArrayList<Object[]> res = new ArrayList<>(ITERATORS_COUNT_PER_FILTER);
+
+ Boolean[] hasNextReturn = new Boolean[RECORDS_COUNT_IN_ITERATOR + 1];
+ Arrays.fill(hasNextReturn, true);
+ hasNextReturn[RECORDS_COUNT_IN_ITERATOR] = false;
+
+ for (int i = 0; i < ITERATORS_COUNT_PER_FILTER; i++) {
+ List<IgniteBiTuple<WALPointer, WALRecord>> tuples = randomRecords();
+
+ WALIterator mockedIter = Mockito.mock(WALIterator.class);
+ when(mockedIter.hasNext()).thenReturn(true, hasNextReturn);
+ when(mockedIter.next()).thenReturn(TEST_RECORD, tuples.toArray(new IgniteBiTuple[] {}));
+
+ res.add(new Object[] {testCaseName, mockedIter, filter, tuples.stream().filter(filter).collect(toList())});
+ }
+
+ return res;
+ }
+
+ /**
+ * @return Random records list for iteration.
+ */
+ private static List<IgniteBiTuple<WALPointer, WALRecord>> randomRecords() {
+ ArrayList<IgniteBiTuple<WALPointer, WALRecord>> res = new ArrayList<>(RECORDS_COUNT_IN_ITERATOR);
+
+ for (int i = 0; i < RECORDS_COUNT_IN_ITERATOR; i++)
+ res.add(randomRecord());
+
+ return res;
+ }
+
+ /**
+ * @return Random test record.
+ */
+ private static IgniteBiTuple<WALPointer, WALRecord> randomRecord() {
+ int recordId = random.nextInt(9);
+
+ switch (recordId) {
+ case 0:
+ return new IgniteBiTuple<>(ZERO_POINTER, new MetastoreDataRecord("key", new byte[0]));
+ case 1:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CheckpointRecord(new FileWALPointer(5738, 0, 0)));
+ case 2:
+ return new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(1, 1), new byte[0], 10));
+ case 3:
+ return new IgniteBiTuple<>(ZERO_POINTER, new PartitionMetaStateRecord(1, 1, OWNING, 1));
+ case 4:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CustomizeTypeRecord(METASTORE_DATA_RECORD));
+ case 5:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CustomizeTypeRecord(PAGE_RECORD));
+ case 6:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CustomizeTypeRecord(PART_META_UPDATE_STATE));
+ case 7:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CustomizeTypeRecord(HEADER_RECORD));
+ case 8:
+ return new IgniteBiTuple<>(ZERO_POINTER, new CustomizeTypeRecord(EXCHANGE));
+ }
+
+ return null;
+ }
+
+ /**
+ * Test class for represent record with different type.
+ */
+ public static class CustomizeTypeRecord extends WALRecord {
+ /** **/
+ private final RecordType type;
+
+ /**
+ * @param type Custom type for this record.
+ */
+ public CustomizeTypeRecord(RecordType type) {
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return type;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScannerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScannerTest.java
new file mode 100644
index 0000000..84442c1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/scanner/WalScannerTest.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.scanner;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder.withIteratorParameters;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.printToFile;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.ScannerHandlers.printToLog;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.scanner.WalScanner.buildWalScanner;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class WalScannerTest {
+ /** **/
+ private static final String TEST_DUMP_FILE = "output.txt";
+ /** **/
+ private static FileWALPointer ZERO_POINTER = new FileWALPointer(0, 0, 0);
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldFindCorrectRecords() throws Exception {
+ // given: Iterator with random value and value which should be find by scanner.
+ long expPageId = 984;
+ int grpId = 123;
+
+ PageSnapshot expPageSnapshot = new PageSnapshot(new FullPageId(expPageId, grpId), new byte[0], 10);
+ CheckpointRecord expCheckpoint = new CheckpointRecord(new FileWALPointer(5738, 0, 0));
+ FixCountRecord expDeltaPage = new FixCountRecord(grpId, expPageId, 4);
+
+ WALIterator mockedIter = mockWalIterator(
+ new IgniteBiTuple<>(ZERO_POINTER, expPageSnapshot),
+ new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(455, grpId), new byte[0], 10)),
+ new IgniteBiTuple<>(ZERO_POINTER, expCheckpoint),
+ new IgniteBiTuple<>(ZERO_POINTER, new MetastoreDataRecord("key", new byte[0])),
+ new IgniteBiTuple<>(ZERO_POINTER, new PartitionMetaStateRecord(grpId, 1, OWNING, 1)),
+ new IgniteBiTuple<>(ZERO_POINTER, expDeltaPage),
+ new IgniteBiTuple<>(ZERO_POINTER, new FixCountRecord(grpId, 98348, 4))
+ );
+
+ IgniteWalIteratorFactory mockedFactory = mock(IgniteWalIteratorFactory.class);
+ when(mockedFactory.iterator(any(IteratorParametersBuilder.class))).thenReturn(mockedIter);
+
+ // Test scanner handler for holding found value instead of printing its.
+ List<WALRecord> holder = new ArrayList<>();
+ ScannerHandler recordCaptor = (rec) -> holder.add(rec.get2());
+
+ Set<T2<Integer,Long>> groupAndPageIds = new HashSet<>();
+
+ groupAndPageIds.add(new T2<>(grpId, expPageId));
+
+ // when: Scanning WAL for searching expected page.
+ buildWalScanner(withIteratorParameters(), mockedFactory)
+ .findAllRecordsFor(groupAndPageIds)
+ .forEach(recordCaptor);
+
+ // then: Should be find only expected value.
+ assertEquals(holder.size(), 3);
+
+ assertEquals(expPageSnapshot, holder.get(0));
+ assertEquals(expCheckpoint, holder.get(1));
+ assertEquals(expDeltaPage, holder.get(2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldFindCorrectRecordsForMoreThanOnePages() throws Exception {
+ // given: Iterator with random value and value which should be find by scanner with several ids.
+ long expPageId1 = 984;
+ long expPageId2 = 9584;
+ long expPageId3 = 98344;
+
+ int grpId = 123;
+
+ PageSnapshot expPageSnapshot = new PageSnapshot(new FullPageId(expPageId1, grpId), new byte[0], 10);
+ CheckpointRecord expCheckpoint = new CheckpointRecord(new FileWALPointer(5738, 0, 0));
+ FixCountRecord expDeltaPage1 = new FixCountRecord(grpId, expPageId2, 4);
+ FixCountRecord expDeltaPage2 = new FixCountRecord(grpId, expPageId3, 4);
+
+ WALIterator mockedIter = mockWalIterator(
+ new IgniteBiTuple<>(ZERO_POINTER, expPageSnapshot),
+ new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(455, grpId), new byte[0], 10)),
+ new IgniteBiTuple<>(ZERO_POINTER, expCheckpoint),
+ new IgniteBiTuple<>(ZERO_POINTER, new MetastoreDataRecord("key", new byte[0])),
+ new IgniteBiTuple<>(ZERO_POINTER, new PartitionMetaStateRecord(grpId, 1, OWNING, 1)),
+ new IgniteBiTuple<>(ZERO_POINTER, expDeltaPage1),
+ new IgniteBiTuple<>(ZERO_POINTER, new FixCountRecord(grpId, 98348, 4)),
+ new IgniteBiTuple<>(ZERO_POINTER, new PartitionMetaStateRecord(grpId, 1, OWNING, 1)),
+ new IgniteBiTuple<>(ZERO_POINTER, expDeltaPage2)
+ );
+
+ IgniteWalIteratorFactory mockedFactory = mock(IgniteWalIteratorFactory.class);
+ when(mockedFactory.iterator(any(IteratorParametersBuilder.class))).thenReturn(mockedIter);
+
+ List<WALRecord> holder = new ArrayList<>();
+ ScannerHandler recordCaptor = (rec) -> holder.add(rec.get2());
+
+ Set<T2<Integer,Long>> groupAndPageIds = new HashSet<>();
+
+ groupAndPageIds.add(new T2<>(grpId, expPageId1));
+ groupAndPageIds.add(new T2<>(grpId, expPageId2));
+ groupAndPageIds.add(new T2<>(grpId, expPageId3));
+
+ // when: Scanning WAL for searching expected page.
+ buildWalScanner(withIteratorParameters(), mockedFactory)
+ .findAllRecordsFor(groupAndPageIds)
+ .forEach(recordCaptor);
+
+ // then: Should be find only expected value.
+ assertEquals(4, holder.size());
+
+ assertEquals(expPageSnapshot, holder.get(0));
+ assertEquals(expCheckpoint, holder.get(1));
+ assertEquals(expDeltaPage1, holder.get(2));
+ assertEquals(expDeltaPage2, holder.get(3));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldDumpToLogFoundRecord() throws Exception {
+ // given: Test logger for interception of logging.
+ long expPageId = 984;
+ int grpId = 123;
+
+ IgniteLogger log = mock(IgniteLogger.class);
+
+ ArgumentCaptor<String> valCapture = ArgumentCaptor.forClass(String.class);
+ doNothing().when(log).info(valCapture.capture());
+
+ WALIterator mockedIter = mockWalIterator(
+ new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(expPageId, grpId), new byte[0], 10)),
+ new IgniteBiTuple<>(ZERO_POINTER, new CheckpointRecord(new FileWALPointer(5738, 0, 0))),
+ new IgniteBiTuple<>(ZERO_POINTER, new FixCountRecord(grpId, expPageId, 4))
+ );
+
+ IgniteWalIteratorFactory factory = mock(IgniteWalIteratorFactory.class);
+ when(factory.iterator(any(IteratorParametersBuilder.class))).thenReturn(mockedIter);
+
+ Set<T2<Integer,Long>> groupAndPageIds = new HashSet<>();
+
+ groupAndPageIds.add(new T2<>(grpId, expPageId));
+
+ // when: Scanning WAL for searching expected page.
+ buildWalScanner(withIteratorParameters(), factory)
+ .findAllRecordsFor(groupAndPageIds)
+ .forEach(printToLog(log));
+
+ // then: Should be find only expected value from log.
+ List<String> actualRecords = valCapture.getAllValues();
+
+ assertEquals(actualRecords.size(), 1);
+
+ assertTrue(actualRecords.get(0), actualRecords.get(0).contains("PageSnapshot ["));
+ assertTrue(actualRecords.get(0), actualRecords.get(0).contains("CheckpointRecord ["));
+ assertTrue(actualRecords.get(0), actualRecords.get(0).contains("FixCountRecord ["));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldDumpToFileFoundRecord() throws Exception {
+ // given: File for dumping records.
+ File targetFile = Paths.get(U.defaultWorkDirectory(), TEST_DUMP_FILE).toFile();
+
+ long expectedPageId = 984;
+ int grpId = 123;
+
+ WALIterator mockedIter = mockWalIterator(
+ new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(expectedPageId, grpId), new byte[0], 10)),
+ new IgniteBiTuple<>(ZERO_POINTER, new CheckpointRecord(new FileWALPointer(5738, 0, 0))),
+ new IgniteBiTuple<>(ZERO_POINTER, new FixCountRecord(grpId, expectedPageId, 4))
+ );
+
+ IgniteWalIteratorFactory factory = mock(IgniteWalIteratorFactory.class);
+ when(factory.iterator(any(IteratorParametersBuilder.class))).thenReturn(mockedIter);
+
+ Set<T2<Integer,Long>> groupAndPageIds = new HashSet<>();
+
+ groupAndPageIds.add(new T2<>(grpId, expectedPageId));
+
+ List<String> actualRecords;
+
+ try {
+ // when: Scanning WAL for searching expected page.
+ buildWalScanner(withIteratorParameters(), factory)
+ .findAllRecordsFor(groupAndPageIds)
+ .forEach(printToFile(targetFile));
+
+ actualRecords = Files.readAllLines(targetFile.toPath());
+ }
+ finally {
+ targetFile.delete();
+ }
+
+ // then: Should be find only expected value from file.
+ assertEquals(actualRecords.size(), 3);
+
+ assertTrue(actualRecords.get(0), actualRecords.get(0).contains("PageSnapshot ["));
+ assertTrue(actualRecords.get(1), actualRecords.get(1).contains("CheckpointRecord ["));
+ assertTrue(actualRecords.get(2), actualRecords.get(2).contains("FixCountRecord ["));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void shouldDumpToFileAndLogFoundRecord() throws Exception {
+ // given: File for dumping records and test logger for interception of records.
+ File targetFile = Paths.get(U.defaultWorkDirectory(), TEST_DUMP_FILE).toFile();
+
+ long expPageId = 984;
+ int grpId = 123;
+
+ IgniteLogger log = mock(IgniteLogger.class);
+
+ ArgumentCaptor<String> valCapture = ArgumentCaptor.forClass(String.class);
+ doNothing().when(log).info(valCapture.capture());
+
+ WALIterator mockedIter = mockWalIterator(
+ new IgniteBiTuple<>(ZERO_POINTER, new PageSnapshot(new FullPageId(expPageId, grpId), new byte[0], 10)),
+ new IgniteBiTuple<>(ZERO_POINTER, new CheckpointRecord(new FileWALPointer(5738, 0, 0))),
+ new IgniteBiTuple<>(ZERO_POINTER, new FixCountRecord(grpId, expPageId, 4))
+ );
+
+ IgniteWalIteratorFactory factory = mock(IgniteWalIteratorFactory.class);
+ when(factory.iterator(any(IteratorParametersBuilder.class))).thenReturn(mockedIter);
+
+ Set<T2<Integer,Long>> groupAndPageIds = new HashSet<>();
+
+ groupAndPageIds.add(new T2<>(grpId, expPageId));
+
+ List<String> actualFileRecords = null;
+
+ try {
+ // when: Scanning WAL for searching expected page.
+ buildWalScanner(withIteratorParameters(), factory)
+ .findAllRecordsFor(groupAndPageIds)
+ .forEach(printToLog(log).andThen(printToFile(targetFile)));
+
+ actualFileRecords = Files.readAllLines(targetFile.toPath());
+ }
+ finally {
+ targetFile.delete();
+ }
+
+ // then: Should be find only expected value from file.
+ assertEquals(actualFileRecords.size(), 3);
+
+ assertTrue(actualFileRecords.get(0), actualFileRecords.get(0).contains("PageSnapshot ["));
+ assertTrue(actualFileRecords.get(1), actualFileRecords.get(1).contains("CheckpointRecord ["));
+ assertTrue(actualFileRecords.get(2), actualFileRecords.get(2).contains("FixCountRecord ["));
+
+ // then: Should be find only expected value from log.
+ List<String> actualLogRecords = valCapture.getAllValues();
+
+ assertEquals(actualLogRecords.size(), 1);
+
+ assertTrue(actualLogRecords.get(0), actualLogRecords.get(0).contains("PageSnapshot ["));
+ assertTrue(actualLogRecords.get(0), actualLogRecords.get(0).contains("CheckpointRecord ["));
+ assertTrue(actualLogRecords.get(0), actualLogRecords.get(0).contains("FixCountRecord ["));
+ }
+
+ /**
+ * @param first Not null first value for return.
+ * @param tail Other values.
+ * @return Mocked WAL iterator.
+ */
+ private WALIterator mockWalIterator(
+ IgniteBiTuple<WALPointer, WALRecord> first,
+ IgniteBiTuple<WALPointer, WALRecord>... tail
+ ) {
+ Boolean[] hasNextReturn = new Boolean[tail.length + 1];
+ Arrays.fill(hasNextReturn, true);
+ hasNextReturn[tail.length] = false;
+
+ WALIterator mockedIter = mock(WALIterator.class);
+ when(mockedIter.hasNext()).thenReturn(true, hasNextReturn);
+
+ when(mockedIter.next()).thenReturn(first, tail);
+
+ return mockedIter;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessorTest.java
new file mode 100644
index 0000000..87ac685
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/diagnostic/DiagnosticProcessorTest.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.diagnostic;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DEFAULT_TARGET_FOLDER;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_FILE;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_LOG;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DiagnosticAction.PRINT_TO_RAW_FILE;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.FILE_FORMAT;
+import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.RAW_FILE_FORMAT;
+
+/**
+ *
+ */
+public class DiagnosticProcessorTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache0";
+ /** Test directory for dump. */
+ private static final String TEST_DUMP_PAGE_FILE = "testDumpPage";
+
+ /** One time configured diagnosticProcessor. */
+ private static DiagnosticProcessor diagnosticProcessor;
+ /** One time configured page id for searching. */
+ private static T2<Integer, Long> expectedPageId;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(1024L * 1024 * 1024)
+ .setPersistenceEnabled(true));
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ try {
+ IgniteEx ignite = startGrid("node0");
+
+ ignite.cluster().active(true);
+
+ ignite.createCache(CACHE_NAME);
+ try (IgniteDataStreamer<Integer, Integer> st = ignite.dataStreamer(CACHE_NAME)) {
+ st.allowOverwrite(true);
+
+ for (int i = 0; i < 10_000; i++)
+ st.addData(i, i);
+ }
+
+ diagnosticProcessor = ignite.context().diagnostic();
+ expectedPageId = findAnyPageId();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DEFAULT_TARGET_FOLDER, false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ diagnosticProcessor = null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void dumpPageHistoryToDefaultDir() throws Exception {
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(expectedPageId)
+ .addAction(PRINT_TO_LOG)
+ .addAction(PRINT_TO_FILE)
+ );
+
+ Path path = Paths.get(U.defaultWorkDirectory(), DEFAULT_TARGET_FOLDER);
+ File dumpFile = path.toFile().listFiles((dir, name) -> name.endsWith(FILE_FORMAT))[0];
+
+ List<String> records = Files.readAllLines(dumpFile.toPath());
+
+ assertTrue(!records.isEmpty());
+
+ assertTrue(records.stream().anyMatch(line -> line.contains("CheckpointRecord")));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void dumpRawPageHistoryToDefaultDir() throws Exception {
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(expectedPageId)
+ .addAction(PRINT_TO_RAW_FILE)
+ );
+
+ Path path = Paths.get(U.defaultWorkDirectory(), DEFAULT_TARGET_FOLDER);
+ File dumpFile = path.toFile().listFiles((dir, name) -> name.endsWith(RAW_FILE_FORMAT))[0];
+
+ try (SegmentIO io = new SegmentIO(0L, new RandomAccessFileIO(dumpFile, StandardOpenOption.READ))) {
+ SegmentHeader hdr = RecordV1Serializer.readSegmentHeader(io, new SimpleSegmentFileInputFactory());
+
+ assertFalse(hdr.isCompacted());
+
+ assertEquals(RecordSerializerFactory.LATEST_SERIALIZER_VERSION, hdr.getSerializerVersion());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void dumpPageHistoryToCustomAbsoluteDir() throws Exception {
+ Path path = Paths.get(U.defaultWorkDirectory(), TEST_DUMP_PAGE_FILE);
+ try {
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(expectedPageId)
+ .folderForDump(path.toFile())
+ .addAction(PRINT_TO_FILE)
+ );
+
+ File dumpFile = path.toFile().listFiles((dir, name) -> name.endsWith(FILE_FORMAT))[0];
+
+ List<String> records = Files.readAllLines(dumpFile.toPath());
+
+ assertTrue(!records.isEmpty());
+
+ assertTrue(records.stream().anyMatch(line -> line.contains("CheckpointRecord")));
+ }
+ finally {
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), TEST_DUMP_PAGE_FILE, false));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void dumpPageHistoryToCustomRelativeDir() throws Exception {
+ Path path = Paths.get(U.defaultWorkDirectory(), DEFAULT_TARGET_FOLDER, TEST_DUMP_PAGE_FILE);
+
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(expectedPageId)
+ .folderForDump(new File(TEST_DUMP_PAGE_FILE))
+ .addAction(PRINT_TO_FILE)
+ );
+
+ File dumpFile = path.toFile().listFiles((dir, name) -> name.endsWith(FILE_FORMAT))[0];
+
+ List<String> records = Files.readAllLines(dumpFile.toPath());
+
+ assertTrue(!records.isEmpty());
+
+ assertTrue(records.stream().anyMatch(line -> line.contains("CheckpointRecord")));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void dumpOnlyCheckpointRecordBecausePageIdNotSet() throws Exception {
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .addAction(PRINT_TO_LOG)
+ .addAction(PRINT_TO_FILE)
+ );
+
+ Path path = Paths.get(U.defaultWorkDirectory(), DEFAULT_TARGET_FOLDER);
+
+ File dumpFile = path.toFile().listFiles((dir, name) -> name.endsWith(FILE_FORMAT))[0];
+
+ List<String> records = Files.readAllLines(dumpFile.toPath());
+
+ assertTrue(records.stream().allMatch(line -> line.contains("CheckpointRecord")));
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test(expected = NullPointerException.class)
+ public void throwExceptionBecauseNotAnyActionsWasSet() throws IgniteCheckedException {
+ diagnosticProcessor.dumpPageHistory(new PageHistoryDiagnoster.DiagnosticPageBuilder()
+ .pageIds(expectedPageId)
+ );
+ }
+
+ /**
+ * Find first any page id for test.
+ *
+ * @return Page id in WAL.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ private T2<Integer, Long> findAnyPageId() throws org.apache.ignite.IgniteCheckedException {
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory();
+
+ try (WALIterator it = factory.iterator(new IteratorParametersBuilder().filesOrDirs(U.defaultWorkDirectory()))) {
+ while (it.hasNext()) {
+ WALRecord record = it.next().get2();
+
+ if (record instanceof PageSnapshot){
+ PageSnapshot rec = (PageSnapshot)record;
+
+ return new T2<>(rec.groupId(), rec.fullPageId().pageId());
+ }
+ }
+ }
+
+ throw new IgniteCheckedException();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 57472e0..f9c1686 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTes
import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeTinyPutGetTest;
+import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessorTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.runner.RunWith;
import org.junit.runners.AllTests;
@@ -179,5 +180,8 @@ public class IgnitePdsTestSuite {
//MetaStorage
GridTestUtils.addTestIfNeeded(suite, IgniteMetaStorageBasicTest.class, ignoredTests);
+
+ //Diagnostic
+ GridTestUtils.addTestIfNeeded(suite, DiagnosticProcessorTest.class, ignoredTests);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index bd6c3fc..1cc6393 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -73,7 +73,9 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.Ignite
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteStandaloneWalIteratorInvalidCrcTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteWithoutArchiverWalIteratorInvalidCrcTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.FilteredWalIteratorTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIteratorTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.scanner.WalScannerTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.runner.RunWith;
import org.junit.runners.AllTests;
@@ -217,6 +219,10 @@ public class IgnitePdsTestSuite2 extends TestSuite {
GridTestUtils.addTestIfNeeded(suite, StandaloneWalRecordsIteratorTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, FilteredWalIteratorTest.class, ignoredTests);
+
+ GridTestUtils.addTestIfNeeded(suite, WalScannerTest.class, ignoredTests);
+
//GridTestUtils.addTestIfNeeded(suite, IgniteWalRecoverySeveralRestartsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteRebalanceScheduleResendPartitionsTest.class, ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 248f8ca..c199c3c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
+import org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFailureHandlingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
@@ -65,6 +66,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, CachePageWriteLockUnlockTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CorruptedTreeFailureHandlingTest.class, ignoredTests);
return suite;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index a015a46..ee8d557 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListe
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -75,6 +77,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.Index;
+import org.h2.message.DbException;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
@@ -673,10 +676,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
if (!cursor.next())
break;
}
- catch (IllegalStateException e) {
- throw new IgniteCheckedException("Key is present in SQL index, but is missing in corresponding " +
- "data page. Previous successfully read key: " +
- CacheObjectUtils.unwrapBinaryIfNeeded(ctx.cacheObjectContext(), previousKey, true, true), e);
+ catch (DbException e) {
+ if (X.hasCause(e, CorruptedTreeException.class))
+ throw new IgniteCheckedException("Key is present in SQL index, but is missing in corresponding " +
+ "data page. Previous successfully read key: " +
+ CacheObjectUtils.unwrapBinaryIfNeeded(ctx.cacheObjectContext(), previousKey, true, true),
+ X.cause(e, CorruptedTreeException.class)
+ );
+
+ throw e;
}
GridH2Row h2Row = (GridH2Row)cursor.get();