You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 12:34:48 UTC

ignite git commit: Introduced index finish discovery message to make sure that current coordinator always know current pending operations.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 98e118778 -> 8e9ba9751


Introduced index finish discovery message to make sure that current coordinator always know current pending operations.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e9ba975
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e9ba975
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e9ba975

Branch: refs/heads/ignite-4565-ddl
Commit: 8e9ba97517747f5f74bad9f35b5a25fe36c38bd0
Parents: 98e1187
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 15:33:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 15:33:54 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../cache/DynamicCacheDescriptor.java           |  16 +--
 .../processors/cache/GridCacheProcessor.java    |  76 ++++++-------
 .../cache/query/GridCacheQueryManager.java      |   6 +-
 .../processors/query/GridQueryProcessor.java    |  10 +-
 .../query/ddl/IndexAcceptDiscoveryMessage.java  |  57 ++++++++++
 .../query/ddl/IndexAckDiscoveryMessage.java     |  88 ---------------
 .../query/ddl/IndexFinishDiscoveryMessage.java  |  88 +++++++++++++++
 .../query/ddl/IndexInitDiscoveryMessage.java    | 109 ------------------
 .../query/ddl/IndexProposeDiscoveryMessage.java | 111 +++++++++++++++++++
 10 files changed, 308 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index abea10b..d8eb733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -352,7 +352,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     false,
                     req.deploymentId());
 
-                desc.indexInitOperation(req.indexInitOperation());
+                desc.indexProposeOperation(req.indexInitOperation());
 
                 DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 5ada093..a8febef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -82,8 +82,8 @@ public class DynamicCacheDescriptor {
     /** */
     private AffinityTopologyVersion rcvdFromVer;
 
-    /** Index operation in init phase. */
-    private AbstractIndexOperation idxInitOp;
+    /** Pending index operation in propose phase. */
+    private AbstractIndexOperation idxProposeOp;
 
     /**
      * @param ctx Context.
@@ -307,17 +307,17 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @return Pending index init operation.
+     * @return Pending index propose operation.
      */
-    public AbstractIndexOperation indexInitOperation() {
-        return idxInitOp;
+    public AbstractIndexOperation indexProposeOperation() {
+        return idxProposeOp;
     }
 
     /**
-     * @param idxInitOp Pending index init operation.
+     * @param idxProposeOp Pending index propose operation.
      */
-    public void indexInitOperation(AbstractIndexOperation idxInitOp) {
-        this.idxInitOp = idxInitOp;
+    public void indexProposeOperation(AbstractIndexOperation idxProposeOp) {
+        this.idxProposeOp = idxProposeOp;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 619d4a4..6a9f838 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -67,8 +67,9 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
@@ -1984,7 +1985,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             req.cacheType(desc.cacheType());
             req.deploymentId(desc.deploymentId());
             req.receivedFrom(desc.receivedFrom());
-            req.indexInitOperation(desc.indexInitOperation());
+            req.indexInitOperation(desc.indexProposeOperation());
 
             reqs.add(req);
         }
@@ -2022,7 +2023,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             req.cacheType(desc.cacheType());
             req.deploymentId(desc.deploymentId());
             req.receivedFrom(desc.receivedFrom());
-            req.indexInitOperation(desc.indexInitOperation());
+            req.indexInitOperation(desc.indexProposeOperation());
 
             reqs.add(req);
 
@@ -2128,7 +2129,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                     ccfg.getCacheMode());
                         }
 
-                        existing.indexInitOperation(req.indexInitOperation());
+                        existing.indexProposeOperation(req.indexInitOperation());
                     }
                     else {
                         assert req.cacheType() != null : req;
@@ -2140,7 +2141,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                 false,
                                 req.deploymentId());
 
-                        desc.indexInitOperation(req.indexInitOperation());
+                        desc.indexProposeOperation(req.indexInitOperation());
 
                         // Received statically configured cache.
                         if (req.initiatingNodeId() == null)
@@ -2684,14 +2685,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return {@code True} if minor topology version should be increased.
      */
     public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) {
-        if (msg instanceof IndexInitDiscoveryMessage) {
-            onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
+        if (msg instanceof IndexAbstractDiscoveryMessage) {
+            IndexAbstractDiscoveryMessage msg0 = (IndexAbstractDiscoveryMessage)msg;
 
-            return false;
-        }
+            IgniteUuid id = msg0.id();
+
+            boolean res = idxDiscoMsgIdHist.add(id);
 
-        if (msg instanceof IndexAckDiscoveryMessage) {
-            onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
+            if (!idxDiscoMsgIdHist.add(id))
+                U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
+            else {
+                if (msg instanceof IndexProposeDiscoveryMessage)
+                    onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+                else if (msg instanceof IndexAcceptDiscoveryMessage)
+                    onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+                else if (msg instanceof IndexFinishDiscoveryMessage)
+                    onIndexfinishMessage((IndexFinishDiscoveryMessage)msg);
+                else
+                    U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+            }
 
             return false;
         }
@@ -2707,12 +2719,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param msg Message.
      */
-    private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+    private void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
         UUID locNodeId = ctx.localNodeId();
 
-        if (!indexMessageValid(msg))
-            return;
-
         AbstractIndexOperation op = msg.operation();
 
         // Ignore in case error was reported by another node earlier.
@@ -2734,7 +2743,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         // Validate request at descriptor level.
-        AbstractIndexOperation oldOp = desc.indexInitOperation();
+        AbstractIndexOperation oldOp = desc.indexProposeOperation();
 
         if (oldOp != null) {
             msg.onError(locNodeId, "Failed to create/drop cache index because another pending operation is in " +
@@ -2745,11 +2754,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         // For already started cache we must make sure that indexing manager will be able to accommodate it.
         if (!isMissingQueryCache(desc))
-            cache(op.space()).context().queries().onIndexInitDiscoveryMessage(msg);
+            cache(op.space()).context().queries().onIndexProposeMessage(msg);
 
         // Finally, set init operation to cache descriptor.
         if (!msg.hasError())
-            desc.indexInitOperation(op);
+            desc.indexProposeOperation(op);
     }
 
     /**
@@ -2757,16 +2766,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param msg Message.
      */
-    private void onIndexAckDiscoveryMessage(IndexAckDiscoveryMessage msg) {
-        if (!indexMessageValid(msg))
-            return;
-
-        if (msg.hasError()) {
-            // TODO: Delegate to indexing to handle error and complete client futures!
-
-            return;
-        }
-
+    private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
         // TODO: Remove init operation from descriptor!
 
         // TODO: Handle concurrent cache stop!
@@ -2777,20 +2777,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Ensure that arrived discovery message is not duplicate.
+     * Handle cache index ack discovery message.
      *
      * @param msg Message.
-     * @return {@code True} if message is not duplicated and should be processed further.
      */
-    private boolean indexMessageValid(IndexAbstractDiscoveryMessage msg) {
-        IgniteUuid id = msg.id();
-
-        boolean res = idxDiscoMsgIdHist.add(id);
+    private void onIndexfinishMessage(IndexFinishDiscoveryMessage msg) {
+        // TODO: Clear dynamic descriptors!
 
-        if (!res)
-            U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
-
-        return res;
+        // TODO: Delegate to indexing to handle result and complete client futures!
     }
 
     /**
@@ -2862,7 +2856,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         DynamicCacheDescriptor startDesc =
                             new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
 
-                        startDesc.indexInitOperation(req.indexInitOperation());
+                        startDesc.indexProposeOperation(req.indexInitOperation());
 
                         if (newTopVer == null) {
                             newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
@@ -3926,7 +3920,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     req.deploymentId(desc.deploymentId());
                     req.startCacheConfiguration(descCfg);
-                    req.indexInitOperation(desc.indexInitOperation());
+                    req.indexInitOperation(desc.indexProposeOperation());
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index c7d5feb..fe53a1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -86,7 +86,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -541,8 +541,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      *
      * @param msg Message.
      */
-    public void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
-        qryProc.onIndexInitDiscoveryMessage(space, msg);
+    public void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
+        qryProc.onIndexProposeMessage(space, msg);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index eb984c9..d0d9374 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
-import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -291,7 +291,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param space Space.
      * @param msg Message.
      */
-    public void onIndexInitDiscoveryMessage(String space, IndexInitDiscoveryMessage msg) {
+    public void onIndexProposeMessage(String space, IndexProposeDiscoveryMessage msg) {
         // Validate.
         idxLock.writeLock().lock();
 
@@ -372,7 +372,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      *
      * @param msg Message.
      */
-    private void onIndexAckDiscoveryMessage(String space, IndexAckDiscoveryMessage msg) {
+    private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) {
         // TODO
     }
 
@@ -803,7 +803,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 ifNotExists);
 
             try {
-                ctx.discovery().sendCustomEvent(new IndexInitDiscoveryMessage(op));
+                ctx.discovery().sendCustomEvent(new IndexProposeDiscoveryMessage(op));
             }
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(new IgniteException("Failed to start index create opeartion due to " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
new file mode 100644
index 0000000..d0fed43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code ACK} message which triggers local index create/drop.
+ */
+public class IndexAcceptDiscoveryMessage extends IndexAbstractDiscoveryMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     *
+     * @param op Original operation.
+     */
+    public IndexAcceptDiscoveryMessage(AbstractIndexOperation op) {
+        super(op);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexAcceptDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
deleted file mode 100644
index 96b6dcd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.ddl;
-
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.UUID;
-
-/**
- * {@code ACK} message which triggers local index create/drop.
- */
-public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Node reported an error. */
-    private final UUID errNodeId;
-
-    /** Error message. */
-    private final String errMsg;
-
-    /**
-     * Constructor.
-     *
-     * @param op Original operation.
-     * @param errNodeId Node reported an error.
-     * @param errMsg Error message.
-     */
-    public IndexAckDiscoveryMessage(AbstractIndexOperation op, UUID errNodeId, String errMsg) {
-        super(op);
-
-        this.errNodeId = errNodeId;
-        this.errMsg = errMsg;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-
-    /**
-     * @return {@code True} if error was reported during init.
-     */
-    public boolean hasError() {
-        return errNodeId != null;
-    }
-
-    /**
-     * @return ID of the node reported an error (if any).
-     */
-    @Nullable public UUID errorNodeId() {
-        return errNodeId;
-    }
-
-    /**
-     * @return Error message (if any).
-     */
-    @Nullable public String errorMessage() {
-        return errMsg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IndexAckDiscoveryMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
new file mode 100644
index 0000000..5a2d66c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * Index creation finished discovery message.
+ */
+public class IndexFinishDiscoveryMessage extends IndexAbstractDiscoveryMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Node reported an error. */
+    private final UUID errNodeId;
+
+    /** Error message. */
+    private final String errMsg;
+    /**
+     * Constructor.
+     *
+     * @param op Original operation.
+     * @param errNodeId Node reported an error.
+     * @param errMsg Error message.
+     */
+    public IndexFinishDiscoveryMessage(AbstractIndexOperation op, UUID errNodeId, String errMsg) {
+        super(op);
+
+        this.errNodeId = errNodeId;
+        this.errMsg = errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return errNodeId != null;
+    }
+
+    /**
+     * @return ID of the node reported an error (if any).
+     */
+    @Nullable public UUID errorNodeId() {
+        return errNodeId;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public String errorMessage() {
+        return errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexFinishDiscoveryMessage.class, this);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
deleted file mode 100644
index e47c901..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.ddl;
-
-import org.apache.ignite.internal.ContextAware;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.UUID;
-
-/**
- * {@code INIT} part of a distributed index create/drop operation.
- */
-public class IndexInitDiscoveryMessage extends IndexAbstractDiscoveryMessage implements ContextAware {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    // TODO: Do we really need it?
-    /** Kernal context. */
-    @GridToStringExclude
-    private transient GridKernalContext ctx;
-
-    /** Node reported an error. */
-    private UUID errNodeId;
-
-    /** Error message. */
-    private String errMsg;
-
-    /**
-     * Constructor.
-     *
-     * @param op Operation.
-     */
-    public IndexInitDiscoveryMessage(AbstractIndexOperation op) {
-        super(op);
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
-        return new IndexAckDiscoveryMessage(op, errNodeId, errMsg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void context(GridKernalContext ctx) {
-        this.ctx = ctx;
-    }
-
-    /**
-     * Set error.
-     *
-     * @param errNodeId Error node ID.
-     * @param errMsg Error message.
-     */
-    public void onError(UUID errNodeId, String errMsg) {
-        if (!hasError()) {
-            this.errNodeId = errNodeId;
-            this.errMsg = errMsg;
-        }
-    }
-
-    /**
-     * @return {@code True} if error was reported during init.
-     */
-    public boolean hasError() {
-        return errNodeId != null;
-    }
-
-    /**
-     * @return ID of the node reported an error (if any).
-     */
-    @Nullable public UUID errorNodeId() {
-        return errNodeId;
-    }
-
-    /**
-     * @return Error message (if any).
-     */
-    @Nullable public String errorMessage() {
-        return errMsg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IndexInitDiscoveryMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
new file mode 100644
index 0000000..6d6c72e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * Propose part of a distributed index create/drop operation.
+ */
+public class IndexProposeDiscoveryMessage extends IndexAbstractDiscoveryMessage implements ContextAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    // TODO: Do we really need it?
+    /** Kernal context. */
+    @GridToStringExclude
+    private transient GridKernalContext ctx;
+
+    /** Node reported an error. */
+    private UUID errNodeId;
+
+    /** Error message. */
+    private String errMsg;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     */
+    public IndexProposeDiscoveryMessage(AbstractIndexOperation op) {
+        super(op);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return hasError() ?
+            new IndexFinishDiscoveryMessage(op, errNodeId, errMsg) :
+            new IndexAcceptDiscoveryMessage(op);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void context(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
+     * Set error.
+     *
+     * @param errNodeId Error node ID.
+     * @param errMsg Error message.
+     */
+    public void onError(UUID errNodeId, String errMsg) {
+        if (!hasError()) {
+            this.errNodeId = errNodeId;
+            this.errMsg = errMsg;
+        }
+    }
+
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return errNodeId != null;
+    }
+
+    /**
+     * @return ID of the node reported an error (if any).
+     */
+    @Nullable public UUID errorNodeId() {
+        return errNodeId;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public String errorMessage() {
+        return errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexProposeDiscoveryMessage.class, this);
+    }
+}