You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/05/16 17:14:06 UTC
ignite git commit: IGNITE-8499 validate_indexes command doesn't
detect absent rows in cache data tree
Repository: ignite
Updated Branches:
refs/heads/master 771e8619e -> 88a6bfdd9
IGNITE-8499 validate_indexes command doesn't detect absent rows in cache data tree
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88a6bfdd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88a6bfdd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88a6bfdd
Branch: refs/heads/master
Commit: 88a6bfdd9c7faea3d230b9959c773900b94356b1
Parents: 771e861
Author: Ivan Rakov <ir...@apache.org>
Authored: Wed May 16 20:13:30 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Wed May 16 20:13:30 2018 +0300
----------------------------------------------------------------------
.../internal/commandline/CommandHandler.java | 21 +-
.../verify/ValidateIndexesPartitionResult.java | 31 ++-
.../verify/VisorValidateIndexesJobResult.java | 38 ++-
.../visor/verify/ValidateIndexesClosure.java | 264 ++++++++++++++-----
.../visor/verify/VisorValidateIndexesTask.java | 6 +-
.../IgniteCacheWithIndexingTestSuite.java | 3 +
.../util/GridCommandHandlerIndexingTest.java | 203 +++++++++++++-
7 files changed, 477 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 7d457fd..04578e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -100,8 +100,8 @@ import static org.apache.ignite.internal.commandline.Command.BASELINE;
import static org.apache.ignite.internal.commandline.Command.CACHE;
import static org.apache.ignite.internal.commandline.Command.DEACTIVATE;
import static org.apache.ignite.internal.commandline.Command.STATE;
-import static org.apache.ignite.internal.commandline.Command.WAL;
import static org.apache.ignite.internal.commandline.Command.TX;
+import static org.apache.ignite.internal.commandline.Command.WAL;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE;
@@ -635,9 +635,9 @@ public class CommandHandler {
boolean errors = false;
for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) {
- Map<PartitionKey, ValidateIndexesPartitionResult> map = nodeEntry.getValue().response();
+ Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult();
- for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : map.entrySet()) {
+ for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet()) {
ValidateIndexesPartitionResult res = e.getValue();
if (!res.issues().isEmpty()) {
@@ -649,6 +649,21 @@ public class CommandHandler {
log(is.toString());
}
}
+
+ Map<String, ValidateIndexesPartitionResult> idxRes = nodeEntry.getValue().indexResult();
+
+ for (Map.Entry<String, ValidateIndexesPartitionResult> e : idxRes.entrySet()) {
+ ValidateIndexesPartitionResult res = e.getValue();
+
+ if (!res.issues().isEmpty()) {
+ errors = true;
+
+ log("SQL Index " + e.getKey() + " " + e.getValue().toString());
+
+ for (IndexValidationIssue is : res.issues())
+ log(is.toString());
+ }
+ }
}
if (!errors)
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
index 1889960..5d74a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorDataTransferObject;
/**
- *
+ * Encapsulates intermediate results of validation of SQL index (if {@link #sqlIdxName} is present) or partition.
*/
public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
/** */
@@ -52,6 +52,10 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
@GridToStringExclude
private List<IndexValidationIssue> issues = new ArrayList<>(10);
+ /** Sql index name. */
+ @GridToStringExclude
+ private String sqlIdxName;
+
/**
*
*/
@@ -64,12 +68,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
* @param size Size.
* @param isPrimary Is primary.
* @param consistentId Consistent id.
+ * @param sqlIdxName Sql index name (optional).
*/
- public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary, Object consistentId) {
+ public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary, Object consistentId,
+ String sqlIdxName) {
this.updateCntr = updateCntr;
this.size = size;
this.isPrimary = isPrimary;
this.consistentId = consistentId;
+ this.sqlIdxName = sqlIdxName;
}
/**
@@ -108,6 +115,13 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
}
/**
+ * @return <code>null</code> for partition validation result, SQL index name for index validation result
+ */
+ public String sqlIndexName() {
+ return sqlIdxName;
+ }
+
+ /**
* @param t Issue.
* @return True if there are already enough issues.
*/
@@ -121,12 +135,18 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
}
/** {@inheritDoc} */
+ @Override public byte getProtocolVersion() {
+ return V2;
+ }
+
+ /** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
out.writeLong(updateCntr);
out.writeLong(size);
out.writeBoolean(isPrimary);
out.writeObject(consistentId);
U.writeCollection(out, issues);
+ U.writeString(out, sqlIdxName);
}
/** {@inheritDoc} */
@@ -136,10 +156,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
isPrimary = in.readBoolean();
consistentId = in.readObject();
issues = U.readList(in);
+
+ if (protoVer >= V2)
+ sqlIdxName = U.readString(in);
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(ValidateIndexesPartitionResult.class, this);
+ return sqlIdxName == null ? S.toString(ValidateIndexesPartitionResult.class, this) :
+ ValidateIndexesPartitionResult.class.getSimpleName() + " [consistentId=" + consistentId +
+ ", sqlIdxName=" + sqlIdxName + "]";
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index 25c97b6..aa74323 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -34,13 +34,19 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
private static final long serialVersionUID = 0L;
/** Results of indexes validation from node. */
- private Map<PartitionKey, ValidateIndexesPartitionResult> res;
+ private Map<PartitionKey, ValidateIndexesPartitionResult> partRes;
+
+ /** Results of reverse indexes validation from node. */
+ private Map<String, ValidateIndexesPartitionResult> idxRes;
/**
- * @param res Results of indexes validation from node.
+ * @param partRes Results of indexes validation from node.
+ * @param idxRes Results of reverse indexes validation from node.
*/
- public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> res) {
- this.res = res;
+ public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
+ Map<String, ValidateIndexesPartitionResult> idxRes) {
+ this.partRes = partRes;
+ this.idxRes = idxRes;
}
/**
@@ -49,21 +55,37 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
public VisorValidateIndexesJobResult() {
}
+ /** {@inheritDoc} */
+ @Override public byte getProtocolVersion() {
+ return V2;
+ }
+
/**
* @return Results of indexes validation from node.
*/
- public Map<PartitionKey, ValidateIndexesPartitionResult> response() {
- return res;
+ public Map<PartitionKey, ValidateIndexesPartitionResult> partitionResult() {
+ return partRes;
+ }
+
+ /**
+ * @return Results of reverse indexes validation from node.
+ */
+ public Map<String, ValidateIndexesPartitionResult> indexResult() {
+ return idxRes;
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
- U.writeMap(out, res);
+ U.writeMap(out, partRes);
+ U.writeMap(out, idxRes);
}
/** {@inheritDoc} */
@Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
- res = U.readMap(in);
+ partRes = U.readMap(in);
+
+ if (protoVer >= V2)
+ idxRes = U.readMap(in);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 373bd15..e0eff61 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -31,9 +31,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
@@ -51,12 +50,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -66,9 +68,13 @@ import org.h2.index.Cursor;
import org.h2.index.Index;
/**
- *
+ * Closure that locally validates indexes of given caches.
+ * Validation consists of three checks:
+ * 1. If entry is present in cache data tree, it's reachable from all cache SQL indexes
+ * 2. If entry is present in cache SQL index, it can be dereferenced with link from index
+ * 3. If entry is present in cache SQL index, it's present in cache data tree
*/
-public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, ValidateIndexesPartitionResult>> {
+public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndexesJobResult> {
/** */
private static final long serialVersionUID = 0L;
@@ -84,7 +90,19 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
private Set<String> cacheNames;
/** Counter of processed partitions. */
- private final AtomicInteger completionCntr = new AtomicInteger(0);
+ private final AtomicInteger processedPartitions = new AtomicInteger(0);
+
+ /** Total partitions. */
+ private volatile int totalPartitions;
+
+ /** Counter of processed indexes. */
+ private final AtomicInteger processedIndexes = new AtomicInteger(0);
+
+ /** Total partitions. */
+ private volatile int totalIndexes;
+
+ /** Last progress print timestamp. */
+ private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
/** Calculation executor. */
private volatile ExecutorService calcExecutor;
@@ -97,7 +115,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
}
/** {@inheritDoc} */
- @Override public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws Exception {
+ @Override public VisorValidateIndexesJobResult call() throws Exception {
calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
@@ -111,7 +129,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
/**
*
*/
- private Map<PartitionKey, ValidateIndexesPartitionResult> call0() throws Exception {
+ private VisorValidateIndexesJobResult call0() throws Exception {
Set<Integer> grpIds = new HashSet<>();
Set<String> missingCaches = new HashSet<>();
@@ -150,8 +168,9 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
}
List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>> procPartFutures = new ArrayList<>();
-
- completionCntr.set(0);
+ List<Future<Map<String, ValidateIndexesPartitionResult>>> procIdxFutures = new ArrayList<>();
+ List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
+ List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
@@ -162,45 +181,82 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
for (GridDhtLocalPartition part : parts)
- procPartFutures.add(processPartitionAsync(grpCtx, part));
- }
+ partArgs.add(new T2<>(grpCtx, part));
- Map<PartitionKey, ValidateIndexesPartitionResult> res = new HashMap<>();
+ GridQueryProcessor qry = ignite.context().query();
- long lastProgressLogTs = U.currentTimeMillis();
+ IgniteH2Indexing indexing = (IgniteH2Indexing)qry.getIndexing();
- for (int i = 0; i < procPartFutures.size(); ) {
- Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(i);
+ for (GridCacheContext ctx : grpCtx.caches()) {
+ Collection<GridQueryTypeDescriptor> types = qry.types(ctx.name());
- try {
- Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get(1, TimeUnit.SECONDS);
+ if (!F.isEmpty(types)) {
+ for (GridQueryTypeDescriptor type : types) {
+ GridH2Table gridH2Tbl = indexing.dataTable(ctx.name(), type.tableName());
+
+ if (gridH2Tbl == null)
+ continue;
- res.putAll(partRes);
+ ArrayList<Index> indexes = gridH2Tbl.getIndexes();
- i++;
+ for (Index idx : indexes)
+ idxArgs.add(new T2<>(ctx, idx));
+ }
+ }
}
- catch (InterruptedException | ExecutionException e) {
- for (int j = i + 1; j < procPartFutures.size(); j++)
- procPartFutures.get(j).cancel(false);
-
- if (e instanceof InterruptedException)
- throw new IgniteInterruptedException((InterruptedException)e);
- else if (e.getCause() instanceof IgniteException)
- throw (IgniteException)e.getCause();
- else
- throw new IgniteException(e.getCause());
+ }
+
+ // To decrease contention on same indexes.
+ Collections.shuffle(partArgs);
+ Collections.shuffle(idxArgs);
+
+ for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs)
+ procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2()));
+
+ for (T2<GridCacheContext, Index> t2 : idxArgs)
+ procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
+
+ totalPartitions = procPartFutures.size();
+ totalIndexes = procIdxFutures.size();
+
+ Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
+ Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
+
+ int curPart = 0;
+ int curIdx = 0;
+ try {
+ for (; curPart < procPartFutures.size(); curPart++) {
+ Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(curPart);
+
+ Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get();
+
+ partResults.putAll(partRes);
}
- catch (TimeoutException ignored) {
- if (U.currentTimeMillis() - lastProgressLogTs > 60 * 1000L) {
- lastProgressLogTs = U.currentTimeMillis();
- log.warning("ValidateIndexesClosure is still running, processed " + completionCntr.get() + " of " +
- procPartFutures.size() + " local partitions");
- }
+ for (; curIdx < procIdxFutures.size(); curIdx++) {
+ Future<Map<String, ValidateIndexesPartitionResult>> fut = procIdxFutures.get(curIdx);
+
+ Map<String, ValidateIndexesPartitionResult> idxRes = fut.get();
+
+ idxResults.putAll(idxRes);
}
}
+ catch (InterruptedException | ExecutionException e) {
+ for (int j = curPart; j < procPartFutures.size(); j++)
+ procPartFutures.get(j).cancel(false);
+
+ for (int j = curIdx; j < procIdxFutures.size(); j++)
+ procIdxFutures.get(j).cancel(false);
+
+ if (e instanceof InterruptedException)
+ throw new IgniteInterruptedException((InterruptedException)e);
+ else if (e.getCause() instanceof IgniteException)
+ throw (IgniteException)e.getCause();
+ else
+ throw new IgniteException(e.getCause());
+ }
- return res;
+ return new VisorValidateIndexesJobResult(partResults, idxResults);
}
/**
@@ -245,12 +301,24 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
- partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId);
+ partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId, null);
boolean enoughIssues = false;
- long keysProcessed = 0;
- long lastProgressLog = U.currentTimeMillis();
+ GridQueryProcessor qryProcessor = ignite.context().query();
+
+ Method m;
+ try {
+ m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class,
+ CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class);
+ }
+ catch (NoSuchMethodException e) {
+ log.error("Failed to invoke typeByValue", e);
+
+ throw new IgniteException(e);
+ }
+
+ m.setAccessible(true);
while (it.hasNextX()) {
if (enoughIssues)
@@ -266,14 +334,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
if (cacheCtx == null)
throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
- GridQueryProcessor qryProcessor = ignite.context().query();
-
try {
- Method m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class,
- CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class);
-
- m.setAccessible(true);
-
QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke(
qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
@@ -298,7 +359,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
Cursor cursor = idx.find((Session) null, h2Row, h2Row);
if (cursor == null || !cursor.next())
- throw new IgniteCheckedException("Key not found.");
+ throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index.");
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
@@ -313,7 +374,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
}
}
}
- catch (IllegalAccessException | NoSuchMethodException e) {
+ catch (IllegalAccessException e) {
log.error("Failed to invoke typeByValue", e);
throw new IgniteException(e);
@@ -325,16 +386,6 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
throw new IgniteException(target);
}
- finally {
- keysProcessed++;
-
- if (U.currentTimeMillis() - lastProgressLog >= 60_000 && partSize > 0) {
- log.warning("Processing partition " + part.id() + " (" + (keysProcessed * 100 / partSize) +
- "% " + keysProcessed + "/" + partSize + ")");
-
- lastProgressLog = U.currentTimeMillis();
- }
- }
}
}
catch (IgniteCheckedException e) {
@@ -345,12 +396,107 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
}
finally {
part.release();
+
+ printProgressIfNeeded();
}
PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
- completionCntr.incrementAndGet();
+ processedPartitions.incrementAndGet();
return Collections.singletonMap(partKey, partRes);
}
+
+ /**
+ *
+ */
+ private void printProgressIfNeeded() {
+ long curTs = U.currentTimeMillis();
+
+ long lastTs = lastProgressPrintTs.get();
+
+ if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) {
+ log.warning("Current progress of ValidateIndexesClosure: processed " +
+ processedPartitions.get() + " of " + totalPartitions + " partitions, " +
+ processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
+ }
+ }
+
+ /**
+ * @param ctx Context.
+ * @param idx Index.
+ */
+ private Future<Map<String, ValidateIndexesPartitionResult>> processIndexAsync(GridCacheContext ctx, Index idx) {
+ return calcExecutor.submit(new Callable<Map<String, ValidateIndexesPartitionResult>>() {
+ @Override public Map<String, ValidateIndexesPartitionResult> call() throws Exception {
+ return processIndex(ctx, idx);
+ }
+ });
+ }
+
+ /**
+ * @param ctx Context.
+ * @param idx Index.
+ */
+ private Map<String, ValidateIndexesPartitionResult> processIndex(GridCacheContext ctx, Index idx) {
+ Object consId = ignite.context().discovery().localNode().consistentId();
+
+ ValidateIndexesPartitionResult idxValidationRes = new ValidateIndexesPartitionResult(
+ -1, -1, true, consId, idx.getName());
+
+ boolean enoughIssues = false;
+
+ Cursor cursor = null;
+
+ try {
+ cursor = idx.find((Session)null, null, null);
+
+ if (cursor == null)
+ throw new IgniteCheckedException("Can't iterate through index: " + idx);
+ }
+ catch (Throwable t) {
+ IndexValidationIssue is = new IndexValidationIssue(null, ctx.name(), idx.getName(), t);
+
+ log.error("Find in index failed: " + is.toString());
+
+ enoughIssues = true;
+ }
+
+ while (!enoughIssues) {
+ KeyCacheObject h2key = null;
+
+ try {
+ if (!cursor.next())
+ break;
+
+ GridH2Row h2Row = (GridH2Row)cursor.get();
+
+ h2key = h2Row.key();
+
+ CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
+
+ if (cacheDataStoreRow == null)
+ throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
+ }
+ catch (Throwable t) {
+ Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
+ ctx.cacheObjectContext(), h2key, true, true);
+
+ IndexValidationIssue is = new IndexValidationIssue(
+ String.valueOf(o), ctx.name(), idx.getName(), t);
+
+ log.error("Failed to lookup key: " + is.toString());
+
+ enoughIssues |= idxValidationRes.reportIssue(is);
+ }
+ }
+
+ String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]";
+
+ processedIndexes.incrementAndGet();
+
+ printProgressIfNeeded();
+
+ return Collections.singletonMap(uniqueIdxName, idxValidationRes);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
index 1a89c2c..52b48a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
@@ -81,10 +80,7 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
ignite.context().resource().injectGeneric(clo);
- Map<PartitionKey, ValidateIndexesPartitionResult> res = clo.call();
-
- return new VisorValidateIndexesJobResult(res);
-
+ return clo.call();
}
catch (Exception e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index bc99981..c896736 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalLoca
import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalPartitionedSelfTest;
import org.apache.ignite.internal.processors.client.IgniteDataStreamerTest;
import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelperTest;
+import org.apache.ignite.util.GridCommandHandlerIndexingTest;
/**
* Cache tests using indexing.
@@ -81,6 +82,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(IgniteDataStreamerTest.class);
+ suite.addTestSuite(GridCommandHandlerIndexingTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
index 9e9c777..62d3fc0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
@@ -19,17 +19,32 @@ package org.apache.ignite.util;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
@@ -38,36 +53,202 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK
*/
public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
/**
- *
+ * Tests that validation doesn't fail if nothing is broken.
*/
- public void testValidateIndexes() throws Exception {
+ public void testValidateIndexesNoErrors() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().active(true);
Ignite client = startGrid("client");
- IgniteCache<Integer, Person> personCache = client.getOrCreateCache(new CacheConfiguration<Integer, Person>()
- .setName("persons-cache-vi")
- .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAtomicityMode(CacheAtomicityMode.ATOMIC)
- .setBackups(1)
- .setQueryEntities(F.asList(personEntity(true, true)))
- .setAffinity(new RendezvousAffinityFunction(false, 32)));
+ String cacheName = "persons-cache-vi";
+
+ IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
ThreadLocalRandom rand = ThreadLocalRandom.current();
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10_000; i++)
personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
injectTestSystemOut();
- assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "persons-cache-vi"));
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
assertTrue(testOut.toString().contains("validate_indexes has finished, no issues found"));
}
/**
+ * Tests that missing rows in CacheDataTree are detected.
+ */
+ public void testBrokenCacheDataTreeShouldFailValidation() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().active(true);
+
+ Ignite client = startGrid("client");
+
+ String cacheName = "persons-cache-vi";
+
+ IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10_000; i++)
+ personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+
+ breakCacheDataTree(ignite, cacheName, 1);
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+
+ assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+ }
+
+ /**
+ * Tests that missing rows in H2 indexes are detected.
+ */
+ public void testBrokenSqlIndexShouldFailValidation() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().active(true);
+
+ Ignite client = startGrid("client");
+
+ String cacheName = "persons-cache-vi";
+
+ IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10_000; i++)
+ personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+
+ breakSqlIndex(ignite, cacheName);
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+
+ assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+ }
+
+ /**
+ * Removes some entries from a partition skipping index update. This effectively breaks the index.
+ */
+ private void breakCacheDataTree(Ignite ig, String cacheName, int partId) {
+ IgniteEx ig0 = (IgniteEx)ig;
+ int cacheId = CU.cacheId(cacheName);
+
+ ScanQuery scanQry = new ScanQuery(partId);
+
+ GridCacheContext<Object, Object> ctx = ig0.context().cache().context().cacheContext(cacheId);
+
+ // Get current update counter
+ String grpName = ig0.context().cache().context().cacheContext(cacheId).config().getGroupName();
+ int cacheGrpId = grpName == null ? cacheName.hashCode() : grpName.hashCode();
+
+ GridDhtLocalPartition locPart = ctx.dht().topology().localPartition(partId);
+ IgniteCacheOffheapManager.CacheDataStore dataStore = ig0.context().cache().context().cache().cacheGroup(cacheGrpId).offheap().dataStore(locPart);
+
+ Iterator<Cache.Entry> it = ig.cache(cacheName).withKeepBinary().query(scanQry).iterator();
+
+ for (int i = 0; i < 5_000; i++) {
+ if (it.hasNext()) {
+ Cache.Entry entry = it.next();
+
+ if (i % 5 == 0) {
+ // Do update
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+ db.checkpointReadLock();
+
+ try {
+ IgniteCacheOffheapManager.CacheDataStore innerStore = U.field(dataStore, "delegate");
+
+ // IgniteCacheOffheapManagerImpl.CacheDataRowStore
+ Object rowStore = U.field(innerStore, "rowStore");
+
+ // IgniteCacheOffheapManagerImpl.CacheDataTree
+ Object dataTree = U.field(innerStore, "dataTree");
+
+ CacheDataRow oldRow = U.invoke(
+ dataTree.getClass(),
+ dataTree,
+ "remove",
+ new SearchRow(cacheId, ctx.toCacheKeyObject(entry.getKey())));
+
+ if (oldRow != null)
+ U.invoke(rowStore.getClass(), rowStore, "removeRow", oldRow.link());
+ }
+ catch (IgniteCheckedException e) {
+ System.out.println("Failed to remove key skipping indexes: " + entry);
+
+ e.printStackTrace();
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+ }
+ else {
+ System.out.println("Early exit for index corruption, keys processed: " + i);
+
+ break;
+ }
+ }
+ }
+
+ /**
+ * Removes some entries from H2 trees skipping partition updates. This effectively breaks the index.
+ */
+ private void breakSqlIndex(Ignite ig, String cacheName) throws Exception {
+ GridQueryProcessor qry = ((IgniteEx)ig).context().query();
+
+ GridCacheContext<Object, Object> ctx = ((IgniteEx)ig).cachex(cacheName).context();
+
+ GridDhtLocalPartition locPart = ctx.topology().localPartitions().get(0);
+
+ GridIterator<CacheDataRow> it = ctx.group().offheap().partitionIterator(locPart.id());
+
+ for (int i = 0; i < 500; i++) {
+ if (!it.hasNextX()) {
+ System.out.println("Early exit for index corruption, keys processed: " + i);
+
+ break;
+ }
+
+ CacheDataRow row = it.nextX();
+
+ ctx.shared().database().checkpointReadLock();
+
+ try {
+ qry.remove(ctx, row);
+ }
+ finally {
+ ctx.shared().database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
+ * Dynamically creates cache with SQL indexes.
+ *
+ * @param ig Client.
+ * @param cacheName Cache name.
+ */
+ private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName) {
+ return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>()
+ .setName(cacheName)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setBackups(1)
+ .setQueryEntities(F.asList(personEntity(true, true)))
+ .setAffinity(new RendezvousAffinityFunction(false, 32)));
+ }
+
+ /**
* @param idxName Index name.
* @param idxOrgId Index org id.
*/