You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 09:24:18 UTC

ignite git commit: WIP on correct discovery data sharing.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl f8a5fef56 -> 4a5bb9375


WIP on correct discovery data sharing.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a5bb937
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a5bb937
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a5bb937

Branch: refs/heads/ignite-4565-ddl
Commit: 4a5bb9375556a0b141b71b5431f3b291e1287a3e
Parents: f8a5fef
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 12:24:09 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 12:24:09 2017 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           |  18 +++
 .../processors/cache/GridCacheProcessor.java    | 138 ++++++++++++++++++-
 .../cache/query/GridCacheQueryManager.java      |  12 +-
 .../processors/query/GridQueryProcessor.java    |  43 +-----
 .../query/ddl/AbstractIndexOperation.java       |  14 +-
 .../query/ddl/CreateIndexOperation.java         |  13 +-
 .../query/ddl/DropIndexOperation.java           |  13 +-
 .../ddl/IndexAbstractDiscoveryMessage.java      |   2 +
 .../query/ddl/IndexAckDiscoveryMessage.java     |  29 ++++
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 10 files changed, 211 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 8b62f03..5ada093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
+import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -81,6 +82,9 @@ public class DynamicCacheDescriptor {
     /** */
     private AffinityTopologyVersion rcvdFromVer;
 
+    /** Index operation in init phase. */
+    private AbstractIndexOperation idxInitOp;
+
     /**
      * @param ctx Context.
      * @param cacheCfg Cache configuration.
@@ -302,6 +306,20 @@ public class DynamicCacheDescriptor {
         return rcvdFrom;
     }
 
+    /**
+     * @return Pending index init operation.
+     */
+    public AbstractIndexOperation indexInitOperation() {
+        return idxInitOp;
+    }
+
+    /**
+     * @param idxInitOp Pending index init operation.
+     */
+    public void indexInitOperation(AbstractIndexOperation idxInitOp) {
+        this.idxInitOp = idxInitOp;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 77702f0..9b33df0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -65,6 +65,12 @@ import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
@@ -104,6 +110,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -205,6 +212,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** */
     private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
 
+    /** ID history for index create/drop discovery messages. */
+    private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> idxDiscoMsgIdHist =
+        new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
     /**
      * @param ctx Kernal context.
      */
@@ -1075,6 +1086,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
         GridCacheContext<?, ?> cacheCtx = cache.context();
 
+        // TODO: Make sure that pending init operation is passed to indexing.
         ctx.query().onCacheStart(cacheCtx);
         ctx.continuous().onCacheStart(cacheCtx);
 
@@ -2532,7 +2544,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cache == null)
             return new GridFinishedFuture<>(new IgniteException("Cache doesn't exist: " + cacheName));
 
-        return cache.context().queries().createIndex(tblName, idx, ifNotExists);
+        return cache.context().queries().dynamicIndexCreate(tblName, idx, ifNotExists);
     }
 
     /**
@@ -2673,6 +2685,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return {@code True} if minor topology version should be increased.
      */
     public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) {
+        if (msg instanceof IndexInitDiscoveryMessage) {
+            onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
+
+            return false;
+        }
+
+        if (msg instanceof IndexAckDiscoveryMessage) {
+            onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
+
+            return false;
+        }
+
         if (msg instanceof CacheAffinityChangeMessage)
             return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));
 
@@ -2680,6 +2704,98 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle cache index init discovery message.
+     *
+     * @param msg Message.
+     */
+    private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+        UUID locNodeId = ctx.localNodeId();
+
+        if (!indexMessageValid(msg))
+            return;
+
+        AbstractIndexOperation op = msg.operation();
+
+        // Ignore in case error was reported by another node earlier.
+        if (msg.hasError()) {
+            if (log.isDebugEnabled())
+                log.debug("Received index init message with error reported by another node (will ignore): " + msg);
+
+            return;
+        }
+
+        // Ensure cache exists.
+        DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+        if (desc == null) {
+            msg.onError(locNodeId, "Cache doesn't exit on node [cacheName=" + op.space() +
+                ". nodeId=" + locNodeId + ']');
+
+            return;
+        }
+
+        // Validate request on descriptor level.
+        AbstractIndexOperation oldOp = desc.indexInitOperation();
+
+        if (oldOp != null) {
+            msg.onError(locNodeId, "Failed to create/drop cache index because another pending operation is in " +
+                "progress [cacheName=" + op.space() + ". newOp=" + op + ", oldOp=" + oldOp + ']');
+
+            return;
+        }
+
+        if (op instanceof CreateIndexOperation) {
+            CreateIndexOperation op0 = (CreateIndexOperation)op;
+
+            // TODO
+        }
+        else if (op instanceof DropIndexOperation) {
+            DropIndexOperation op0 = (DropIndexOperation)op;
+
+            // TODO
+        }
+        else
+            U.warn(log, "Received index init message with unsupported operation (will ignore): " + msg);
+
+        // For already started cache we must make sure that indexing manager will be able to accommodate it.
+        if (isMissingQueryCache(desc))
+            cache(op.space()).context().queries().onIndexInitDiscoveryMessage(msg);
+
+        // Finally, set init operation to cache descriptor.
+        if (!msg.hasError())
+            desc.indexInitOperation(op);
+    }
+
+    /**
+     * Handle cache index ack discovery message.
+     *
+     * @param msg Message.
+     */
+    private void onIndexAckDiscoveryMessage(IndexAckDiscoveryMessage msg) {
+        if (!indexMessageValid(msg))
+            return;
+
+        // TODO
+    }
+
+    /**
+     * Ensure that arrived discovery message is not duplicate.
+     *
+     * @param msg Message.
+     * @return {@code True} if message is not duplicated and should be processed further.
+     */
+    private boolean indexMessageValid(IndexAbstractDiscoveryMessage msg) {
+        IgniteUuid id = msg.id();
+
+        boolean res = idxDiscoMsgIdHist.add(id);
+
+        if (!res)
+            U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
+
+        return res;
+    }
+
+    /**
      * @param batch Change request batch.
      * @param topVer Current topology version.
      * @return {@code True} if minor topology version should be increased.
@@ -3569,16 +3685,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @throws IgniteCheckedException In case of error.
      */
-    public void createMissingCaches() throws IgniteCheckedException {
+    public void createMissingQueryCaches() throws IgniteCheckedException {
         for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
-            CacheConfiguration ccfg = e.getValue().cacheConfiguration();
+            DynamicCacheDescriptor desc = e.getValue();
 
-            if (!caches.containsKey(maskNull(ccfg.getName())) && QueryUtils.isEnabled(ccfg))
-                dynamicStartCache(null, ccfg.getName(), null, false, true, true).get();
+            if (isMissingQueryCache(desc))
+                dynamicStartCache(null, desc.cacheConfiguration().getName(), null, false, true, true).get();
         }
     }
 
     /**
+     * Whether cache defined by provided descriptor is not yet started and has queries enabled.
+     *
+     * @param desc Descriptor.
+     * @return {@code True} if this is missing query cache.
+     */
+    private boolean isMissingQueryCache(DynamicCacheDescriptor desc) {
+        CacheConfiguration ccfg = desc.cacheConfiguration();
+
+        return !caches.containsKey(maskNull(ccfg.getName())) && QueryUtils.isEnabled(ccfg);
+    }
+
+    /**
      * Registers MBean for cache components.
      *
      * @param o Cache component.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 30858f9..c7d5feb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -518,7 +519,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param ifNotExists When set to {@code true} operation will fail if index already exists.
      * @return Future completed when index is created.
      */
-    public IgniteInternalFuture<?> createIndex(String tblName, QueryIndex idx, boolean ifNotExists) {
+    public IgniteInternalFuture<?> dynamicIndexCreate(String tblName, QueryIndex idx, boolean ifNotExists) {
         if (!enterBusy())
             return new GridFinishedFuture<>(new IgniteException("Failed to create index because " +
                 "local node is stopping."));
@@ -536,6 +537,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * Validate index init discovery message.
+     *
+     * @param msg Message.
+     */
+    public void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+        qryProc.onIndexInitDiscoveryMessage(space, msg);
+    }
+
+    /**
      * Undeploys given class loader.
      *
      * @param ldr Class loader to undeploy.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a65b318..b8fa36f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -42,12 +42,10 @@ import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -65,7 +63,6 @@ import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMes
 import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -113,10 +110,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** Index create/drop client futures. */
     private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>();
 
-    /** ID history for index create/drop discovery messages. */
-    private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> idxDiscoMsgIdHist =
-        new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
-
     /** */
     private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
 
@@ -148,23 +141,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             idx.start(ctx, busyLock);
         }
 
-        ctx.discovery().setCustomEventListener(IndexAbstractDiscoveryMessage.class,
-            new CustomEventListener<IndexAbstractDiscoveryMessage>() {
-                /** {@inheritDoc} */
-                @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
-                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
-                    IndexAbstractDiscoveryMessage msg) {
-                    if (notDuplicate(msg)) {
-                        if (msg instanceof IndexInitDiscoveryMessage)
-                            onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
-                        if (msg instanceof IndexAckDiscoveryMessage)
-                            onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
-                        else
-                            U.warn(log, "Unexpected custom discovery message [msg=" + msg + ']');
-                    }
-                }
-            });
-
         // Schedule queries detail metrics eviction.
         qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
             @Override public void run() {
@@ -314,13 +290,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /**
      * Handle index init discovery message.
      *
+     * @param space Space.
      * @param msg Message.
      */
-    private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
-        // Return message with existing error.
-        if (msg.hasError())
-            return;
-
+    public void onIndexInitDiscoveryMessage(String space, IndexInitDiscoveryMessage msg) {
         // Validate.
         idxLock.writeLock().lock();
 
@@ -1173,18 +1146,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Ensuret that arrived discovery message is not duplicate.
-     *
-     * @param msg Message.
-     * @return {@code True} if message is not duplicated and should be processed further.
-     */
-    private boolean notDuplicate(IndexAbstractDiscoveryMessage msg) {
-        IgniteUuid id = msg.id();
-
-        return idxDiscoMsgIdHist.add(id);
-    }
-
-    /**
      * @param ver Version.
      */
     public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
index cfa8f24..24600e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
@@ -33,15 +33,20 @@ public abstract class AbstractIndexOperation implements Serializable {
     /** Operation ID. */
     private final UUID opId;
 
+    /** Space. */
+    private final String space;
+
     /**
      * Constructor.
      *
      * @param cliNodeId Client node ID.
      * @param opId Operation ID.
+     * @param space Space.
      */
-    public AbstractIndexOperation(UUID cliNodeId, UUID opId) {
+    public AbstractIndexOperation(UUID cliNodeId, UUID opId, String space) {
         this.cliNodeId = cliNodeId;
         this.opId = opId;
+        this.space = space;
     }
 
     /**
@@ -57,4 +62,11 @@ public abstract class AbstractIndexOperation implements Serializable {
     public UUID operationId() {
         return opId;
     }
+
+    /**
+     * @return Space.
+     */
+    public String space() {
+        return space;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
index 125d0e2..7030586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
@@ -30,9 +30,6 @@ public class CreateIndexOperation extends AbstractIndexOperation {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Space. */
-    private final String space;
-
     /** Table name. */
     private final String tblName;
 
@@ -55,9 +52,8 @@ public class CreateIndexOperation extends AbstractIndexOperation {
      */
     public CreateIndexOperation(UUID cliNodeId, UUID opId, String space, String tblName, QueryIndex idx,
         boolean ifNotExists) {
-        super(cliNodeId, opId);
+        super(cliNodeId, opId, space);
 
-        this.space = space;
         this.tblName = tblName;
         this.idx = idx;
         this.ifNotExists = ifNotExists;
@@ -71,13 +67,6 @@ public class CreateIndexOperation extends AbstractIndexOperation {
     }
 
     /**
-     * @return Schema name.
-     */
-    public String space() {
-        return space;
-    }
-
-    /**
      * @return Table name.
      */
     public String tableName() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
index 14c3ffd..be2941c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
@@ -28,9 +28,6 @@ public class DropIndexOperation extends AbstractIndexOperation {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Space. */
-    private final String space;
-
     /** Index name. */
     private final String idxName;
 
@@ -47,21 +44,13 @@ public class DropIndexOperation extends AbstractIndexOperation {
      * @param ifExists Ignore operation if index doesn't exist.
      */
     DropIndexOperation(UUID cliNodeId, UUID opId, String space, String idxName, boolean ifExists) {
-        super(cliNodeId, opId);
+        super(cliNodeId, opId, space);
 
-        this.space = space;
         this.idxName = idxName;
         this.ifExists = ifExists;
     }
 
     /**
-     * @return Space.
-     */
-    public String space() {
-        return space;
-    }
-
-    /**
      * @return Index name.
      */
     public String indexName() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index b7599e6..2912986 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.ddl;
 
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -32,6 +33,7 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     private final IgniteUuid id = IgniteUuid.randomUuid();
 
     /** Operation. */
+    @GridToStringInclude
     private final AbstractIndexOperation op;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
index 44a89fc..4fb224d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
@@ -21,6 +21,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.UUID;
+
 /**
  * {@code ACK} message which triggers local index create/drop.
  */
@@ -28,6 +30,12 @@ public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Node reported an error. */
+    private UUID errNodeId;
+
+    /** Error message. */
+    private String errMsg;
+
     /**
      * Constructor.
      *
@@ -47,6 +55,27 @@ public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
         return false;
     }
 
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return errNodeId != null;
+    }
+
+    /**
+     * @return ID of the node reported an error (if any).
+     */
+    @Nullable public UUID errorNodeId() {
+        return errNodeId;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public String errorMessage() {
+        return errMsg;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IndexAckDiscoveryMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 719897a..bfe51a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1449,7 +1449,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         catch (SQLException e) {
                             if (!cachesCreated && e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1) {
                                 try {
-                                    ctx.cache().createMissingCaches();
+                                    ctx.cache().createMissingQueryCaches();
                                 }
                                 catch (IgniteCheckedException ignored) {
                                     throw new CacheException("Failed to create missing caches.", e);