You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/29 09:16:44 UTC

ignite git commit: IGNITE-9753 Several optimization of validate_indexes - Fixes #5063.

Repository: ignite
Updated Branches:
  refs/heads/master 594aac83c -> 1fe02df8d


IGNITE-9753 Several optimization of validate_indexes - Fixes #5063.

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fe02df8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fe02df8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fe02df8

Branch: refs/heads/master
Commit: 1fe02df8daa94ca0761049cd41659f4ae18f580c
Parents: 594aac8
Author: Ivan Daschinskiy <iv...@gmail.com>
Authored: Mon Oct 29 12:14:54 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Oct 29 12:14:54 2018 +0300

----------------------------------------------------------------------
 .../internal/commandline/CommandHandler.java    |  34 ++-
 .../visor/verify/IndexIntegrityCheckIssue.java  |  74 +++++++
 .../verify/VisorValidateIndexesJobResult.java   |  30 ++-
 .../resources/META-INF/classnames.properties    |   1 +
 .../visor/verify/ValidateIndexesClosure.java    | 205 ++++++++++++++++---
 .../util/GridCommandHandlerIndexingTest.java    | 152 ++++++++------
 6 files changed, 398 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 35981b5..c1853f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
 import org.apache.ignite.internal.visor.tx.VisorTxTask;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue;
 import org.apache.ignite.internal.visor.verify.IndexValidationIssue;
 import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
 import org.apache.ignite.internal.visor.verify.VisorContentionTask;
@@ -780,9 +781,20 @@ public class CommandHandler {
             }
         }
 
-        boolean errors = false;
-
         for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) {
+            boolean errors = false;
+
+            log("validate_indexes result on node " + nodeEntry.getKey() + ":");
+
+            Collection<IndexIntegrityCheckIssue> integrityCheckFailures = nodeEntry.getValue().integrityCheckFailures();
+
+            if (!integrityCheckFailures.isEmpty()) {
+                errors = true;
+
+                for (IndexIntegrityCheckIssue is : integrityCheckFailures)
+                    log("\t" + is.toString());
+            }
+
             Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult();
 
             for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet()) {
@@ -791,10 +803,10 @@ public class CommandHandler {
                 if (!res.issues().isEmpty()) {
                     errors = true;
 
-                    log(e.getKey().toString() + " " + e.getValue().toString());
+                    log("\t" + e.getKey().toString() + " " + e.getValue().toString());
 
                     for (IndexValidationIssue is : res.issues())
-                        log(is.toString());
+                        log("\t\t" + is.toString());
                 }
             }
 
@@ -806,18 +818,18 @@ public class CommandHandler {
                 if (!res.issues().isEmpty()) {
                     errors = true;
 
-                    log("SQL Index " + e.getKey() + " " + e.getValue().toString());
+                    log("\tSQL Index " + e.getKey() + " " + e.getValue().toString());
 
                     for (IndexValidationIssue is : res.issues())
-                        log(is.toString());
+                        log("\t\t" + is.toString());
                 }
             }
-        }
 
-        if (!errors)
-            log("validate_indexes has finished, no issues found.");
-        else
-            log("validate_indexes has finished with errors (listed above).");
+            if (!errors)
+                log("no issues found.\n");
+            else
+                log("issues found (listed above).\n");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
new file mode 100644
index 0000000..ec6e5b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+
+/**
+ *
+ */
+public class IndexIntegrityCheckIssue extends VisorDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache group name. */
+    private String grpName;
+
+    /** T. */
+    @GridToStringExclude
+    private Throwable t;
+
+    /**
+     *
+     */
+    public IndexIntegrityCheckIssue() {
+        // Default constructor required for Externalizable.
+    }
+
+    /**
+     * @param grpName Group name.
+     * @param t Data integrity check error.
+     */
+    public IndexIntegrityCheckIssue(String grpName, Throwable t) {
+        this.grpName = grpName;
+        this.t = t;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+        U.writeString(out, this.grpName);
+        out.writeObject(t);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+        this.grpName = U.readString(in);
+        this.t = (Throwable)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexIntegrityCheckIssue.class, this) + ", " + t.getClass() + ": " + t.getMessage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index aa74323..f84fc1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.visor.verify;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
@@ -39,14 +42,22 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
     /** Results of reverse indexes validation from node. */
     private Map<String, ValidateIndexesPartitionResult> idxRes;
 
+    /** Integrity check issues. */
+    private Collection<IndexIntegrityCheckIssue> integrityCheckFailures;
+
     /**
      * @param partRes Results of indexes validation from node.
      * @param idxRes Results of reverse indexes validation from node.
+     * @param  integrityCheckFailures Collection of indexes integrity check failures.
      */
-    public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
-        Map<String, ValidateIndexesPartitionResult> idxRes) {
+    public VisorValidateIndexesJobResult(
+            @NotNull Map<PartitionKey, ValidateIndexesPartitionResult> partRes,
+            @NotNull Map<String, ValidateIndexesPartitionResult> idxRes,
+            @NotNull Collection<IndexIntegrityCheckIssue> integrityCheckFailures
+    ) {
         this.partRes = partRes;
         this.idxRes = idxRes;
+        this.integrityCheckFailures = integrityCheckFailures;
     }
 
     /**
@@ -57,7 +68,7 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
 
     /** {@inheritDoc} */
     @Override public byte getProtocolVersion() {
-        return V2;
+        return V3;
     }
 
     /**
@@ -71,13 +82,21 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
      * @return Results of reverse indexes validation from node.
      */
     public Map<String, ValidateIndexesPartitionResult> indexResult() {
-        return idxRes;
+        return idxRes == null ? Collections.emptyMap() : idxRes;
+    }
+
+    /**
+     * @return Collection of failed integrity checks.
+     */
+    public Collection<IndexIntegrityCheckIssue> integrityCheckFailures() {
+        return integrityCheckFailures == null ? Collections.emptyList() : integrityCheckFailures;
     }
 
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
         U.writeMap(out, partRes);
         U.writeMap(out, idxRes);
+        U.writeCollection(out, integrityCheckFailures);
     }
 
     /** {@inheritDoc} */
@@ -86,6 +105,9 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
 
         if (protoVer >= V2)
             idxRes = U.readMap(in);
+
+        if (protoVer >= V3)
+            integrityCheckFailures = U.readCollection(in);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index a0e2ba7..fda0fe4 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2206,6 +2206,7 @@ org.apache.ignite.internal.visor.util.VisorEventMapper
 org.apache.ignite.internal.visor.util.VisorExceptionWrapper
 org.apache.ignite.internal.visor.util.VisorTaskUtils$4
 org.apache.ignite.internal.visor.verify.IndexValidationIssue
+org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue
 org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult
 org.apache.ignite.internal.visor.verify.VisorContentionJobResult
 org.apache.ignite.internal.visor.verify.VisorContentionTask

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 503b57c..ec02c25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -18,6 +18,8 @@ package org.apache.ignite.internal.visor.verify;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,22 +39,29 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -104,9 +113,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     /** Counter of processed indexes. */
     private final AtomicInteger processedIndexes = new AtomicInteger(0);
 
+    /** Counter of integrity checked indexes. */
+    private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0);
+
     /** Total partitions. */
     private volatile int totalIndexes;
 
+    /** Total cache groups. */
+    private volatile  int totalCacheGrps;
+
     /** Last progress print timestamp. */
     private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
 
@@ -182,10 +197,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
         List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
 
+        totalCacheGrps = grpIds.size();
+
+        Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = integrityCheckIndexesPartitions(grpIds);
+
         for (Integer grpId : grpIds) {
             CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
 
-            if (grpCtx == null)
+            if (grpCtx == null || integrityCheckResults.containsKey(grpId))
                 continue;
 
             List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
@@ -210,7 +229,8 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
                         ArrayList<Index> indexes = gridH2Tbl.getIndexes();
 
                         for (Index idx : indexes)
-                            idxArgs.add(new T2<>(ctx, idx));
+                            if (idx instanceof H2TreeIndex)
+                                idxArgs.add(new T2<>(ctx, idx));
                     }
                 }
             }
@@ -220,15 +240,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         Collections.shuffle(partArgs);
         Collections.shuffle(idxArgs);
 
+        totalPartitions = partArgs.size();
+        totalIndexes = idxArgs.size();
+
         for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs)
             procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2()));
 
         for (T2<GridCacheContext, Index> t2 : idxArgs)
             procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
 
-        totalPartitions = procPartFutures.size();
-        totalIndexes = procIdxFutures.size();
-
         Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
         Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
 
@@ -250,6 +270,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
                 idxResults.putAll(idxRes);
             }
+
+            log.warning("ValidateIndexesClosure finished: processed " + totalPartitions + " partitions and "
+                    + totalIndexes + " indexes.");
         }
         catch (InterruptedException | ExecutionException e) {
             for (int j = curPart; j < procPartFutures.size(); j++)
@@ -258,15 +281,102 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
             for (int j = curIdx; j < procIdxFutures.size(); j++)
                 procIdxFutures.get(j).cancel(false);
 
-            if (e instanceof InterruptedException)
-                throw new IgniteInterruptedException((InterruptedException)e);
-            else if (e.getCause() instanceof IgniteException)
-                throw (IgniteException)e.getCause();
-            else
-                throw new IgniteException(e.getCause());
+            throw unwrapFutureException(e);
+        }
+
+        return new VisorValidateIndexesJobResult(partResults, idxResults, integrityCheckResults.values());
+    }
+
+    /**
+     * @param grpIds Group ids.
+     */
+    private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(Set<Integer> grpIds) {
+        List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size());
+
+        for (Integer grpId: grpIds) {
+            final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
+
+            if (grpCtx == null) {
+                integrityCheckedIndexes.incrementAndGet();
+
+                continue;
+            }
+
+            Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
+                    calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
+                        @Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception {
+                            IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx);
+
+                            return new T2<>(grpCtx.groupId(), issue);
+                        }
+                    });
+
+            integrityCheckFutures.add(checkFut);
+        }
+
+        Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
+
+        int curFut = 0;
+        try {
+            for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) {
+                T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
+
+                if (res.getValue() != null)
+                    integrityCheckResults.put(res.getKey(), res.getValue());
+            }
+        }
+        catch (InterruptedException | ExecutionException e) {
+            for (int j = curFut; j < integrityCheckFutures.size(); j++)
+                integrityCheckFutures.get(j).cancel(false);
+
+            throw unwrapFutureException(e);
+        }
+
+        return integrityCheckResults;
+    }
+
+    /**
+     * @param gctx Cache group context.
+     */
+    private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx) {
+        GridKernalContext ctx = ignite.context();
+        GridCacheSharedContext cctx = ctx.cache().context();
+
+        try {
+            FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore();
+
+            if (pageStoreMgr == null)
+                return null;
+
+            int pageSz = gctx.dataRegion().pageMemory().pageSize();
+
+            PageStore pageStore = pageStoreMgr.getStore(gctx.groupId(), PageIdAllocator.INDEX_PARTITION);
+
+            long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 0);
+
+            ByteBuffer buf = ByteBuffer.allocateDirect(pageSz);
+
+            buf.order(ByteOrder.nativeOrder());
+
+            for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) {
+                buf.clear();
+
+                pageStore.read(pageId, buf, true);
+            }
+
+            return null;
         }
+        catch (Throwable t) {
+            log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t);
+
+            return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
+        }
+        finally {
+            integrityCheckedIndexes.incrementAndGet();
 
-        return new VisorValidateIndexesJobResult(partResults, idxResults);
+            printProgressIfNeeded("Current progress of ValidateIndexesClosure: checked integrity of "
+                    + integrityCheckedIndexes.get() + " index partitions of " + totalCacheGrps + " cache groups");
+        }
     }
 
     /**
@@ -371,6 +481,19 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
                 if (cacheCtx == null)
                     throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
 
+                if (row.link() == 0L) {
+                    String errMsg = "Invalid partition row, possibly deleted";
+
+                    log.error(errMsg);
+
+                    IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null,
+                            new IgniteCheckedException(errMsg));
+
+                    enoughIssues |= partRes.reportIssue(is);
+
+                    continue;
+                }
+
                 try {
                     QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke(
                         qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
@@ -392,6 +515,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
                     ArrayList<Index> indexes = gridH2Tbl.getIndexes();
 
                     for (Index idx : indexes) {
+                        if (!(idx instanceof H2TreeIndex))
+                            continue;
+
                         try {
                             Cursor cursor = idx.find((Session) null, h2Row, h2Row);
 
@@ -434,7 +560,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         finally {
             part.release();
 
-            printProgressIfNeeded();
+            printProgressOfIndexValidationIfNeeded();
         }
 
         PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
@@ -447,16 +573,21 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     /**
      *
      */
-    private void printProgressIfNeeded() {
-        long curTs = U.currentTimeMillis();
+    private void printProgressOfIndexValidationIfNeeded() {
+        printProgressIfNeeded("Current progress of ValidateIndexesClosure: processed " +
+                processedPartitions.get() + " of " + totalPartitions + " partitions, " +
+                processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
+    }
 
+    /**
+     *
+     */
+    private void printProgressIfNeeded(String msg) {
+        long curTs = U.currentTimeMillis();
         long lastTs = lastProgressPrintTs.get();
 
-        if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) {
-            log.warning("Current progress of ValidateIndexesClosure: processed " +
-                processedPartitions.get() + " of " + totalPartitions + " partitions, " +
-                processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
-        }
+        if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs))
+            log.warning(msg);
     }
 
     /**
@@ -546,12 +677,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
                 h2key = h2Row.key();
 
-                CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
+                if (h2Row.link() != 0L) {
+                    CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
 
-                if (cacheDataStoreRow == null)
-                    throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
-
-                previousKey = h2key;
+                    if (cacheDataStoreRow == null)
+                        throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
+                }
+                else
+                    throw new IgniteCheckedException("Invalid index row, possibly deleted " + h2Row);
             }
             catch (Throwable t) {
                 Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
@@ -564,14 +697,34 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
                 enoughIssues |= idxValidationRes.reportIssue(is);
             }
+            finally {
+                previousKey = h2key;
+            }
         }
 
         String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]";
 
         processedIndexes.incrementAndGet();
 
-        printProgressIfNeeded();
+        printProgressOfIndexValidationIfNeeded();
 
         return Collections.singletonMap(uniqueIdxName, idxValidationRes);
     }
+
+    /**
+     * @param e Future result exception.
+     * @return Unwrapped exception.
+     */
+    private IgniteException unwrapFutureException(Exception e) {
+        assert e instanceof InterruptedException || e instanceof ExecutionException : "Expecting either InterruptedException " +
+                "or ExecutionException";
+
+        if (e instanceof InterruptedException)
+            return new IgniteInterruptedException((InterruptedException)e);
+        else if (e.getCause() instanceof IgniteException)
+            return  (IgniteException)e.getCause();
+        else
+            return new IgniteException(e.getCause());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
index 48e94c1..c7693d2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.util;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -24,8 +27,8 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.QueryEntity;
@@ -39,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.tree.SearchRow;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -47,57 +51,35 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
 
 /**
  *
  */
 public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
+    /** Test cache name. */
+    private static final String CACHE_NAME = "persons-cache-vi";
+
     /**
      * Tests that validation doesn't fail if nothing is broken.
      */
     public void testValidateIndexesNoErrors() throws Exception {
-        Ignite ignite = startGrids(2);
-
-        ignite.cluster().active(true);
-
-        Ignite client = startGrid("client");
-
-        String cacheName = "persons-cache-vi";
-
-        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
-
-        ThreadLocalRandom rand = ThreadLocalRandom.current();
-
-        for (int i = 0; i < 10_000; i++)
-            personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+        prepareGridForTest();
 
         injectTestSystemOut();
 
-        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
 
-        assertTrue(testOut.toString().contains("validate_indexes has finished, no issues found"));
+        assertTrue(testOut.toString().contains("no issues found"));
     }
 
     /**
      * Tests that missing rows in CacheDataTree are detected.
      */
     public void testBrokenCacheDataTreeShouldFailValidation() throws Exception {
-        Ignite ignite = startGrids(2);
+        Ignite ignite = prepareGridForTest();
 
-        ignite.cluster().active(true);
-
-        Ignite client = startGrid("client");
-
-        String cacheName = "persons-cache-vi";
-
-        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
-
-        ThreadLocalRandom rand = ThreadLocalRandom.current();
-
-        for (int i = 0; i < 10_000; i++)
-            personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
-
-        breakCacheDataTree(ignite, cacheName, 1);
+        breakCacheDataTree(ignite, CACHE_NAME, 1);
 
         injectTestSystemOut();
 
@@ -105,11 +87,11 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
             execute(
                 "--cache",
                 "validate_indexes",
-                cacheName,
+                CACHE_NAME,
                 "checkFirst", "10000",
                 "checkThrough", "10"));
 
-        assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+        assertTrue(testOut.toString().contains("issues found (listed above)"));
 
         assertTrue(testOut.toString().contains(
             "Key is present in SQL index, but is missing in corresponding data page."));
@@ -119,6 +101,46 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
      * Tests that missing rows in H2 indexes are detected.
      */
     public void testBrokenSqlIndexShouldFailValidation() throws Exception {
+        Ignite ignite = prepareGridForTest();
+
+        breakSqlIndex(ignite, CACHE_NAME);
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
+
+        assertTrue(testOut.toString().contains("issues found (listed above)"));
+    }
+
+    /**
+     * Tests that missing rows in H2 indexes are detected.
+     */
+    public void testCorruptedIndexPartitionShouldFailValidation() throws Exception {
+        Ignite ignite = prepareGridForTest();
+
+        forceCheckpoint();
+
+        File idxPath = indexPartition(ignite, CACHE_NAME);
+
+        stopAllGrids();
+
+        corruptIndexPartition(idxPath);
+
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME));
+
+        assertTrue(testOut.toString().contains("issues found (listed above)"));
+    }
+
+    /**
+     *
+     */
+    private Ignite prepareGridForTest() throws Exception{
         Ignite ignite = startGrids(2);
 
         ignite.cluster().active(true);
@@ -127,20 +149,52 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
 
         String cacheName = "persons-cache-vi";
 
-        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+        client.getOrCreateCache(new CacheConfiguration<Integer, Person>()
+                .setName(cacheName)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+                .setBackups(1)
+                .setQueryEntities(F.asList(personEntity(true, true)))
+                .setAffinity(new RendezvousAffinityFunction(false, 32)));
 
         ThreadLocalRandom rand = ThreadLocalRandom.current();
 
-        for (int i = 0; i < 10_000; i++)
-            personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+        try (IgniteDataStreamer<Integer, Person> streamer = client.dataStreamer(CACHE_NAME);) {
+            for (int i = 0; i < 10_000; i++)
+                streamer.addData(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+        }
 
-        breakSqlIndex(ignite, cacheName);
+        return ignite;
+    }
 
-        injectTestSystemOut();
+    /**
+     * Get index partition file for specific node and cache.
+     */
+    private File indexPartition(Ignite ig, String cacheName) {
+        IgniteEx ig0 = (IgniteEx)ig;
 
-        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+        FilePageStoreManager pageStoreManager = ((FilePageStoreManager) ig0.context().cache().context().pageStore());
 
-        assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+        return new File(pageStoreManager.cacheWorkDir(false, cacheName), INDEX_FILE_NAME);
+    }
+
+    /**
+     * Write some random trash in index partition.
+     */
+    private void corruptIndexPartition(File path) throws IOException {
+        assertTrue(path.exists());
+
+        ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+        try (RandomAccessFile idx = new RandomAccessFile(path, "rw")) {
+            byte[] trash = new byte[1024];
+
+            rand.nextBytes(trash);
+
+            idx.seek(4096);
+
+            idx.write(trash);
+        }
     }
 
     /**
@@ -242,22 +296,6 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
     }
 
     /**
-     * Dynamically creates cache with SQL indexes.
-     *
-     * @param ig Client.
-     * @param cacheName Cache name.
-     */
-    private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName) {
-        return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>()
-            .setName(cacheName)
-            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
-            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
-            .setBackups(1)
-            .setQueryEntities(F.asList(personEntity(true, true)))
-            .setAffinity(new RendezvousAffinityFunction(false, 32)));
-    }
-
-    /**
      * @param idxName Index name.
      * @param idxOrgId Index org id.
      */