You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/29 09:16:44 UTC
ignite git commit: IGNITE-9753 Several optimization of
validate_indexes - Fixes #5063.
Repository: ignite
Updated Branches:
refs/heads/master 594aac83c -> 1fe02df8d
IGNITE-9753 Several optimization of validate_indexes - Fixes #5063.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fe02df8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fe02df8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fe02df8
Branch: refs/heads/master
Commit: 1fe02df8daa94ca0761049cd41659f4ae18f580c
Parents: 594aac8
Author: Ivan Daschinskiy <iv...@gmail.com>
Authored: Mon Oct 29 12:14:54 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Oct 29 12:14:54 2018 +0300
----------------------------------------------------------------------
.../internal/commandline/CommandHandler.java | 34 ++-
.../visor/verify/IndexIntegrityCheckIssue.java | 74 +++++++
.../verify/VisorValidateIndexesJobResult.java | 30 ++-
.../resources/META-INF/classnames.properties | 1 +
.../visor/verify/ValidateIndexesClosure.java | 205 ++++++++++++++++---
.../util/GridCommandHandlerIndexingTest.java | 152 ++++++++------
6 files changed, 398 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 35981b5..c1853f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
import org.apache.ignite.internal.visor.tx.VisorTxTask;
import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue;
import org.apache.ignite.internal.visor.verify.IndexValidationIssue;
import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
import org.apache.ignite.internal.visor.verify.VisorContentionTask;
@@ -780,9 +781,20 @@ public class CommandHandler {
}
}
- boolean errors = false;
-
for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) {
+ boolean errors = false;
+
+ log("validate_indexes result on node " + nodeEntry.getKey() + ":");
+
+ Collection<IndexIntegrityCheckIssue> integrityCheckFailures = nodeEntry.getValue().integrityCheckFailures();
+
+ if (!integrityCheckFailures.isEmpty()) {
+ errors = true;
+
+ for (IndexIntegrityCheckIssue is : integrityCheckFailures)
+ log("\t" + is.toString());
+ }
+
Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult();
for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet()) {
@@ -791,10 +803,10 @@ public class CommandHandler {
if (!res.issues().isEmpty()) {
errors = true;
- log(e.getKey().toString() + " " + e.getValue().toString());
+ log("\t" + e.getKey().toString() + " " + e.getValue().toString());
for (IndexValidationIssue is : res.issues())
- log(is.toString());
+ log("\t\t" + is.toString());
}
}
@@ -806,18 +818,18 @@ public class CommandHandler {
if (!res.issues().isEmpty()) {
errors = true;
- log("SQL Index " + e.getKey() + " " + e.getValue().toString());
+ log("\tSQL Index " + e.getKey() + " " + e.getValue().toString());
for (IndexValidationIssue is : res.issues())
- log(is.toString());
+ log("\t\t" + is.toString());
}
}
- }
- if (!errors)
- log("validate_indexes has finished, no issues found.");
- else
- log("validate_indexes has finished with errors (listed above).");
+ if (!errors)
+ log("no issues found.\n");
+ else
+ log("issues found (listed above).\n");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
new file mode 100644
index 0000000..ec6e5b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+
+/**
+ *
+ */
+public class IndexIntegrityCheckIssue extends VisorDataTransferObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache group name. */
+ private String grpName;
+
+ /** T. */
+ @GridToStringExclude
+ private Throwable t;
+
+ /**
+ *
+ */
+ public IndexIntegrityCheckIssue() {
+ // Default constructor required for Externalizable.
+ }
+
+ /**
+ * @param grpName Group name.
+ * @param t Data integrity check error.
+ */
+ public IndexIntegrityCheckIssue(String grpName, Throwable t) {
+ this.grpName = grpName;
+ this.t = t;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeString(out, this.grpName);
+ out.writeObject(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+ this.grpName = U.readString(in);
+ this.t = (Throwable)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexIntegrityCheckIssue.class, this) + ", " + t.getClass() + ": " + t.getMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index aa74323..f84fc1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.visor.verify;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.jetbrains.annotations.NotNull;
/**
*
@@ -39,14 +42,22 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
/** Results of reverse indexes validation from node. */
private Map<String, ValidateIndexesPartitionResult> idxRes;
+ /** Integrity check issues. */
+ private Collection<IndexIntegrityCheckIssue> integrityCheckFailures;
+
/**
* @param partRes Results of indexes validation from node.
* @param idxRes Results of reverse indexes validation from node.
+ * @param integrityCheckFailures Collection of indexes integrity check failures.
*/
- public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
- Map<String, ValidateIndexesPartitionResult> idxRes) {
+ public VisorValidateIndexesJobResult(
+ @NotNull Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
+ @NotNull Map<String, ValidateIndexesPartitionResult> idxRes,
+ @NotNull Collection<IndexIntegrityCheckIssue> integrityCheckFailures
+ ) {
this.partRes = partRes;
this.idxRes = idxRes;
+ this.integrityCheckFailures = integrityCheckFailures;
}
/**
@@ -57,7 +68,7 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
/** {@inheritDoc} */
@Override public byte getProtocolVersion() {
- return V2;
+ return V3;
}
/**
@@ -71,13 +82,21 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
* @return Results of reverse indexes validation from node.
*/
public Map<String, ValidateIndexesPartitionResult> indexResult() {
- return idxRes;
+ return idxRes == null ? Collections.emptyMap() : idxRes;
+ }
+
+ /**
+ * @return Collection of failed integrity checks.
+ */
+ public Collection<IndexIntegrityCheckIssue> integrityCheckFailures() {
+ return integrityCheckFailures == null ? Collections.emptyList() : integrityCheckFailures;
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
U.writeMap(out, partRes);
U.writeMap(out, idxRes);
+ U.writeCollection(out, integrityCheckFailures);
}
/** {@inheritDoc} */
@@ -86,6 +105,9 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
if (protoVer >= V2)
idxRes = U.readMap(in);
+
+ if (protoVer >= V3)
+ integrityCheckFailures = U.readCollection(in);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index a0e2ba7..fda0fe4 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2206,6 +2206,7 @@ org.apache.ignite.internal.visor.util.VisorEventMapper
org.apache.ignite.internal.visor.util.VisorExceptionWrapper
org.apache.ignite.internal.visor.util.VisorTaskUtils$4
org.apache.ignite.internal.visor.verify.IndexValidationIssue
+org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue
org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult
org.apache.ignite.internal.visor.verify.VisorContentionJobResult
org.apache.ignite.internal.visor.verify.VisorContentionTask
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 503b57c..ec02c25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -18,6 +18,8 @@ package org.apache.ignite.internal.visor.verify;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,22 +39,29 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -104,9 +113,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
/** Counter of processed indexes. */
private final AtomicInteger processedIndexes = new AtomicInteger(0);
+ /** Counter of integrity checked indexes. */
+ private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0);
+
/** Total partitions. */
private volatile int totalIndexes;
+ /** Total cache groups. */
+ private volatile int totalCacheGrps;
+
/** Last progress print timestamp. */
private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
@@ -182,10 +197,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
+ totalCacheGrps = grpIds.size();
+
+ Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = integrityCheckIndexesPartitions(grpIds);
+
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
- if (grpCtx == null)
+ if (grpCtx == null || integrityCheckResults.containsKey(grpId))
continue;
List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
@@ -210,7 +229,8 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
ArrayList<Index> indexes = gridH2Tbl.getIndexes();
for (Index idx : indexes)
- idxArgs.add(new T2<>(ctx, idx));
+ if (idx instanceof H2TreeIndex)
+ idxArgs.add(new T2<>(ctx, idx));
}
}
}
@@ -220,15 +240,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
Collections.shuffle(partArgs);
Collections.shuffle(idxArgs);
+ totalPartitions = partArgs.size();
+ totalIndexes = idxArgs.size();
+
for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs)
procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2()));
for (T2<GridCacheContext, Index> t2 : idxArgs)
procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
- totalPartitions = procPartFutures.size();
- totalIndexes = procIdxFutures.size();
-
Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
@@ -250,6 +270,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
idxResults.putAll(idxRes);
}
+
+ log.warning("ValidateIndexesClosure finished: processed " + totalPartitions + " partitions and "
+ + totalIndexes + " indexes.");
}
catch (InterruptedException | ExecutionException e) {
for (int j = curPart; j < procPartFutures.size(); j++)
@@ -258,15 +281,102 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
for (int j = curIdx; j < procIdxFutures.size(); j++)
procIdxFutures.get(j).cancel(false);
- if (e instanceof InterruptedException)
- throw new IgniteInterruptedException((InterruptedException)e);
- else if (e.getCause() instanceof IgniteException)
- throw (IgniteException)e.getCause();
- else
- throw new IgniteException(e.getCause());
+ throw unwrapFutureException(e);
+ }
+
+ return new VisorValidateIndexesJobResult(partResults, idxResults, integrityCheckResults.values());
+ }
+
+ /**
+ * @param grpIds Group ids.
+ */
+ private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(Set<Integer> grpIds) {
+ List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size());
+
+ for (Integer grpId: grpIds) {
+ final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
+
+ if (grpCtx == null) {
+ integrityCheckedIndexes.incrementAndGet();
+
+ continue;
+ }
+
+ Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
+ calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
+ @Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception {
+ IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx);
+
+ return new T2<>(grpCtx.groupId(), issue);
+ }
+ });
+
+ integrityCheckFutures.add(checkFut);
+ }
+
+ Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
+
+ int curFut = 0;
+ try {
+ for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) {
+ T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
+
+ if (res.getValue() != null)
+ integrityCheckResults.put(res.getKey(), res.getValue());
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {
+ for (int j = curFut; j < integrityCheckFutures.size(); j++)
+ integrityCheckFutures.get(j).cancel(false);
+
+ throw unwrapFutureException(e);
+ }
+
+ return integrityCheckResults;
+ }
+
+ /**
+ * @param gctx Cache group context.
+ */
+ private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx) {
+ GridKernalContext ctx = ignite.context();
+ GridCacheSharedContext cctx = ctx.cache().context();
+
+ try {
+ FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore();
+
+ if (pageStoreMgr == null)
+ return null;
+
+ int pageSz = gctx.dataRegion().pageMemory().pageSize();
+
+ PageStore pageStore = pageStoreMgr.getStore(gctx.groupId(), PageIdAllocator.INDEX_PARTITION);
+
+ long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 0);
+
+ ByteBuffer buf = ByteBuffer.allocateDirect(pageSz);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) {
+ buf.clear();
+
+ pageStore.read(pageId, buf, true);
+ }
+
+ return null;
}
+ catch (Throwable t) {
+ log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t);
+
+ return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
+ }
+ finally {
+ integrityCheckedIndexes.incrementAndGet();
- return new VisorValidateIndexesJobResult(partResults, idxResults);
+ printProgressIfNeeded("Current progress of ValidateIndexesClosure: checked integrity of "
+ + integrityCheckedIndexes.get() + " index partitions of " + totalCacheGrps + " cache groups");
+ }
}
/**
@@ -371,6 +481,19 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
if (cacheCtx == null)
throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
+ if (row.link() == 0L) {
+ String errMsg = "Invalid partition row, possibly deleted";
+
+ log.error(errMsg);
+
+ IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null,
+ new IgniteCheckedException(errMsg));
+
+ enoughIssues |= partRes.reportIssue(is);
+
+ continue;
+ }
+
try {
QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke(
qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
@@ -392,6 +515,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
ArrayList<Index> indexes = gridH2Tbl.getIndexes();
for (Index idx : indexes) {
+ if (!(idx instanceof H2TreeIndex))
+ continue;
+
try {
Cursor cursor = idx.find((Session) null, h2Row, h2Row);
@@ -434,7 +560,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
finally {
part.release();
- printProgressIfNeeded();
+ printProgressOfIndexValidationIfNeeded();
}
PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
@@ -447,16 +573,21 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
/**
*
*/
- private void printProgressIfNeeded() {
- long curTs = U.currentTimeMillis();
+ private void printProgressOfIndexValidationIfNeeded() {
+ printProgressIfNeeded("Current progress of ValidateIndexesClosure: processed " +
+ processedPartitions.get() + " of " + totalPartitions + " partitions, " +
+ processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
+ }
+ /**
+ *
+ */
+ private void printProgressIfNeeded(String msg) {
+ long curTs = U.currentTimeMillis();
long lastTs = lastProgressPrintTs.get();
- if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) {
- log.warning("Current progress of ValidateIndexesClosure: processed " +
- processedPartitions.get() + " of " + totalPartitions + " partitions, " +
- processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
- }
+ if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs))
+ log.warning(msg);
}
/**
@@ -546,12 +677,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
h2key = h2Row.key();
- CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
+ if (h2Row.link() != 0L) {
+ CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
- if (cacheDataStoreRow == null)
- throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
-
- previousKey = h2key;
+ if (cacheDataStoreRow == null)
+ throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
+ }
+ else
+ throw new IgniteCheckedException("Invalid index row, possibly deleted " + h2Row);
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
@@ -564,14 +697,34 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
enoughIssues |= idxValidationRes.reportIssue(is);
}
+ finally {
+ previousKey = h2key;
+ }
}
String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]";
processedIndexes.incrementAndGet();
- printProgressIfNeeded();
+ printProgressOfIndexValidationIfNeeded();
return Collections.singletonMap(uniqueIdxName, idxValidationRes);
}
+
+ /**
+ * @param e Future result exception.
+ * @return Unwrapped exception.
+ */
+ private IgniteException unwrapFutureException(Exception e) {
+ assert e instanceof InterruptedException || e instanceof ExecutionException : "Expecting either InterruptedException " +
+ "or ExecutionException";
+
+ if (e instanceof InterruptedException)
+ return new IgniteInterruptedException((InterruptedException)e);
+ else if (e.getCause() instanceof IgniteException)
+ return (IgniteException)e.getCause();
+ else
+ return new IgniteException(e.getCause());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
index 48e94c1..c7693d2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
@@ -17,6 +17,9 @@
package org.apache.ignite.util;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
@@ -24,8 +27,8 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
@@ -39,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -47,57 +51,35 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
/**
*
*/
public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
+ /** Test cache name. */
+ private static final String CACHE_NAME = "persons-cache-vi";
+
/**
* Tests that validation doesn't fail if nothing is broken.
*/
public void testValidateIndexesNoErrors() throws Exception {
- Ignite ignite = startGrids(2);
-
- ignite.cluster().active(true);
-
- Ignite client = startGrid("client");
-
- String cacheName = "persons-cache-vi";
-
- IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
-
- ThreadLocalRandom rand = ThreadLocalRandom.current();
-
- for (int i = 0; i < 10_000; i++)
- personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+ prepareGridForTest();
injectTestSystemOut();
- assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
- assertTrue(testOut.toString().contains("validate_indexes has finished, no issues found"));
+ assertTrue(testOut.toString().contains("no issues found"));
}
/**
* Tests that missing rows in CacheDataTree are detected.
*/
public void testBrokenCacheDataTreeShouldFailValidation() throws Exception {
- Ignite ignite = startGrids(2);
+ Ignite ignite = prepareGridForTest();
- ignite.cluster().active(true);
-
- Ignite client = startGrid("client");
-
- String cacheName = "persons-cache-vi";
-
- IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
-
- ThreadLocalRandom rand = ThreadLocalRandom.current();
-
- for (int i = 0; i < 10_000; i++)
- personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
-
- breakCacheDataTree(ignite, cacheName, 1);
+ breakCacheDataTree(ignite, CACHE_NAME, 1);
injectTestSystemOut();
@@ -105,11 +87,11 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
execute(
"--cache",
"validate_indexes",
- cacheName,
+ CACHE_NAME,
"checkFirst", "10000",
"checkThrough", "10"));
- assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+ assertTrue(testOut.toString().contains("issues found (listed above)"));
assertTrue(testOut.toString().contains(
"Key is present in SQL index, but is missing in corresponding data page."));
@@ -119,6 +101,46 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
* Tests that missing rows in H2 indexes are detected.
*/
public void testBrokenSqlIndexShouldFailValidation() throws Exception {
+ Ignite ignite = prepareGridForTest();
+
+ breakSqlIndex(ignite, CACHE_NAME);
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
+
+ assertTrue(testOut.toString().contains("issues found (listed above)"));
+ }
+
+ /**
+ * Tests that missing rows in H2 indexes are detected.
+ */
+ public void testCorruptedIndexPartitionShouldFailValidation() throws Exception {
+ Ignite ignite = prepareGridForTest();
+
+ forceCheckpoint();
+
+ File idxPath = indexPartition(ignite, CACHE_NAME);
+
+ stopAllGrids();
+
+ corruptIndexPartition(idxPath);
+
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
+
+ assertTrue(testOut.toString().contains("issues found (listed above)"));
+ }
+
+ /**
+ *
+ */
+ private Ignite prepareGridForTest() throws Exception{
Ignite ignite = startGrids(2);
ignite.cluster().active(true);
@@ -127,20 +149,52 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
String cacheName = "persons-cache-vi";
- IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+ client.getOrCreateCache(new CacheConfiguration<Integer, Person>()
+ .setName(cacheName)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setBackups(1)
+ .setQueryEntities(F.asList(personEntity(true, true)))
+ .setAffinity(new RendezvousAffinityFunction(false, 32)));
ThreadLocalRandom rand = ThreadLocalRandom.current();
- for (int i = 0; i < 10_000; i++)
- personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+ try (IgniteDataStreamer<Integer, Person> streamer = client.dataStreamer(CACHE_NAME);) {
+ for (int i = 0; i < 10_000; i++)
+ streamer.addData(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+ }
- breakSqlIndex(ignite, cacheName);
+ return ignite;
+ }
- injectTestSystemOut();
+ /**
+ * Get index partition file for specific node and cache.
+ */
+ private File indexPartition(Ignite ig, String cacheName) {
+ IgniteEx ig0 = (IgniteEx)ig;
- assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+ FilePageStoreManager pageStoreManager = ((FilePageStoreManager) ig0.context().cache().context().pageStore());
- assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+ return new File(pageStoreManager.cacheWorkDir(false, cacheName), INDEX_FILE_NAME);
+ }
+
+ /**
+ * Write some random trash in index partition.
+ */
+ private void corruptIndexPartition(File path) throws IOException {
+ assertTrue(path.exists());
+
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ try (RandomAccessFile idx = new RandomAccessFile(path, "rw")) {
+ byte[] trash = new byte[1024];
+
+ rand.nextBytes(trash);
+
+ idx.seek(4096);
+
+ idx.write(trash);
+ }
}
/**
@@ -242,22 +296,6 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
}
/**
- * Dynamically creates cache with SQL indexes.
- *
- * @param ig Client.
- * @param cacheName Cache name.
- */
- private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName) {
- return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>()
- .setName(cacheName)
- .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
- .setAtomicityMode(CacheAtomicityMode.ATOMIC)
- .setBackups(1)
- .setQueryEntities(F.asList(personEntity(true, true)))
- .setAffinity(new RendezvousAffinityFunction(false, 32)));
- }
-
- /**
* @param idxName Index name.
* @param idxOrgId Index org id.
*/