You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 12:34:48 UTC
ignite git commit: Introduced index finish discovery message to make
sure that current coordinator always know current pending operations.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 98e118778 -> 8e9ba9751
Introduced index finish discovery message to make sure that current coordinator always know current pending operations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e9ba975
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e9ba975
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e9ba975
Branch: refs/heads/ignite-4565-ddl
Commit: 8e9ba97517747f5f74bad9f35b5a25fe36c38bd0
Parents: 98e1187
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 15:33:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 15:33:54 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 2 +-
.../cache/DynamicCacheDescriptor.java | 16 +--
.../processors/cache/GridCacheProcessor.java | 76 ++++++-------
.../cache/query/GridCacheQueryManager.java | 6 +-
.../processors/query/GridQueryProcessor.java | 10 +-
.../query/ddl/IndexAcceptDiscoveryMessage.java | 57 ++++++++++
.../query/ddl/IndexAckDiscoveryMessage.java | 88 ---------------
.../query/ddl/IndexFinishDiscoveryMessage.java | 88 +++++++++++++++
.../query/ddl/IndexInitDiscoveryMessage.java | 109 ------------------
.../query/ddl/IndexProposeDiscoveryMessage.java | 111 +++++++++++++++++++
10 files changed, 308 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index abea10b..d8eb733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -352,7 +352,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
false,
req.deploymentId());
- desc.indexInitOperation(req.indexInitOperation());
+ desc.indexProposeOperation(req.indexInitOperation());
DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 5ada093..a8febef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -82,8 +82,8 @@ public class DynamicCacheDescriptor {
/** */
private AffinityTopologyVersion rcvdFromVer;
- /** Index operation in init phase. */
- private AbstractIndexOperation idxInitOp;
+ /** Pending index operation in propose phase. */
+ private AbstractIndexOperation idxProposeOp;
/**
* @param ctx Context.
@@ -307,17 +307,17 @@ public class DynamicCacheDescriptor {
}
/**
- * @return Pending index init operation.
+ * @return Pending index propose operation.
*/
- public AbstractIndexOperation indexInitOperation() {
- return idxInitOp;
+ public AbstractIndexOperation indexProposeOperation() {
+ return idxProposeOp;
}
/**
- * @param idxInitOp Pending index init operation.
+ * @param idxProposeOp Pending index propose operation.
*/
- public void indexInitOperation(AbstractIndexOperation idxInitOp) {
- this.idxInitOp = idxInitOp;
+ public void indexProposeOperation(AbstractIndexOperation idxProposeOp) {
+ this.idxProposeOp = idxProposeOp;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 619d4a4..6a9f838 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -67,8 +67,9 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
@@ -1984,7 +1985,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheType(desc.cacheType());
req.deploymentId(desc.deploymentId());
req.receivedFrom(desc.receivedFrom());
- req.indexInitOperation(desc.indexInitOperation());
+ req.indexInitOperation(desc.indexProposeOperation());
reqs.add(req);
}
@@ -2022,7 +2023,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheType(desc.cacheType());
req.deploymentId(desc.deploymentId());
req.receivedFrom(desc.receivedFrom());
- req.indexInitOperation(desc.indexInitOperation());
+ req.indexInitOperation(desc.indexProposeOperation());
reqs.add(req);
@@ -2128,7 +2129,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.getCacheMode());
}
- existing.indexInitOperation(req.indexInitOperation());
+ existing.indexProposeOperation(req.indexInitOperation());
}
else {
assert req.cacheType() != null : req;
@@ -2140,7 +2141,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
false,
req.deploymentId());
- desc.indexInitOperation(req.indexInitOperation());
+ desc.indexProposeOperation(req.indexInitOperation());
// Received statically configured cache.
if (req.initiatingNodeId() == null)
@@ -2684,14 +2685,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return {@code True} if minor topology version should be increased.
*/
public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) {
- if (msg instanceof IndexInitDiscoveryMessage) {
- onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
+ if (msg instanceof IndexAbstractDiscoveryMessage) {
+ IndexAbstractDiscoveryMessage msg0 = (IndexAbstractDiscoveryMessage)msg;
- return false;
- }
+ IgniteUuid id = msg0.id();
+
+ boolean res = idxDiscoMsgIdHist.add(id);
- if (msg instanceof IndexAckDiscoveryMessage) {
- onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
+ if (!idxDiscoMsgIdHist.add(id))
+ U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
+ else {
+ if (msg instanceof IndexProposeDiscoveryMessage)
+ onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+ else if (msg instanceof IndexAcceptDiscoveryMessage)
+ onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+ else if (msg instanceof IndexFinishDiscoveryMessage)
+ onIndexfinishMessage((IndexFinishDiscoveryMessage)msg);
+ else
+ U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+ }
return false;
}
@@ -2707,12 +2719,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @param msg Message.
*/
- private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+ private void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
UUID locNodeId = ctx.localNodeId();
- if (!indexMessageValid(msg))
- return;
-
AbstractIndexOperation op = msg.operation();
// Ignore in case error was reported by another node earlier.
@@ -2734,7 +2743,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
// Validate request at descriptor level.
- AbstractIndexOperation oldOp = desc.indexInitOperation();
+ AbstractIndexOperation oldOp = desc.indexProposeOperation();
if (oldOp != null) {
msg.onError(locNodeId, "Failed to create/drop cache index because another pending operation is in " +
@@ -2745,11 +2754,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// For already started cache we must make sure that indexing manager will be able to accommodate it.
if (!isMissingQueryCache(desc))
- cache(op.space()).context().queries().onIndexInitDiscoveryMessage(msg);
+ cache(op.space()).context().queries().onIndexProposeMessage(msg);
// Finally, set init operation to cache descriptor.
if (!msg.hasError())
- desc.indexInitOperation(op);
+ desc.indexProposeOperation(op);
}
/**
@@ -2757,16 +2766,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @param msg Message.
*/
- private void onIndexAckDiscoveryMessage(IndexAckDiscoveryMessage msg) {
- if (!indexMessageValid(msg))
- return;
-
- if (msg.hasError()) {
- // TODO: Delegate to indexing to handle error and complete client futures!
-
- return;
- }
-
+ private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
// TODO: Remove init operation from descriptor!
// TODO: Handle concurrent cache stop!
@@ -2777,20 +2777,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Ensure that arrived discovery message is not duplicate.
+ * Handle cache index ack discovery message.
*
* @param msg Message.
- * @return {@code True} if message is not duplicated and should be processed further.
*/
- private boolean indexMessageValid(IndexAbstractDiscoveryMessage msg) {
- IgniteUuid id = msg.id();
-
- boolean res = idxDiscoMsgIdHist.add(id);
+ private void onIndexfinishMessage(IndexFinishDiscoveryMessage msg) {
+ // TODO: Clear dynamic descriptors!
- if (!res)
- U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
-
- return res;
+ // TODO: Delegate to indexing to handle result and complete client futures!
}
/**
@@ -2862,7 +2856,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor startDesc =
new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
- startDesc.indexInitOperation(req.indexInitOperation());
+ startDesc.indexProposeOperation(req.indexInitOperation());
if (newTopVer == null) {
newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
@@ -3926,7 +3920,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.deploymentId(desc.deploymentId());
req.startCacheConfiguration(descCfg);
- req.indexInitOperation(desc.indexInitOperation());
+ req.indexInitOperation(desc.indexProposeOperation());
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index c7d5feb..fe53a1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -86,7 +86,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -541,8 +541,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
*
* @param msg Message.
*/
- public void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
- qryProc.onIndexInitDiscoveryMessage(space, msg);
+ public void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
+ qryProc.onIndexProposeMessage(space, msg);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index eb984c9..d0d9374 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
-import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -291,7 +291,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param space Space.
* @param msg Message.
*/
- public void onIndexInitDiscoveryMessage(String space, IndexInitDiscoveryMessage msg) {
+ public void onIndexProposeMessage(String space, IndexProposeDiscoveryMessage msg) {
// Validate.
idxLock.writeLock().lock();
@@ -372,7 +372,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*
* @param msg Message.
*/
- private void onIndexAckDiscoveryMessage(String space, IndexAckDiscoveryMessage msg) {
+ private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) {
// TODO
}
@@ -803,7 +803,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
ifNotExists);
try {
- ctx.discovery().sendCustomEvent(new IndexInitDiscoveryMessage(op));
+ ctx.discovery().sendCustomEvent(new IndexProposeDiscoveryMessage(op));
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(new IgniteException("Failed to start index create opeartion due to " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
new file mode 100644
index 0000000..d0fed43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code ACK} message which triggers local index create/drop.
+ */
+public class IndexAcceptDiscoveryMessage extends IndexAbstractDiscoveryMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param op Original operation.
+ */
+ public IndexAcceptDiscoveryMessage(AbstractIndexOperation op) {
+ super(op);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexAcceptDiscoveryMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
deleted file mode 100644
index 96b6dcd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.ddl;
-
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.UUID;
-
-/**
- * {@code ACK} message which triggers local index create/drop.
- */
-public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Node reported an error. */
- private final UUID errNodeId;
-
- /** Error message. */
- private final String errMsg;
-
- /**
- * Constructor.
- *
- * @param op Original operation.
- * @param errNodeId Node reported an error.
- * @param errMsg Error message.
- */
- public IndexAckDiscoveryMessage(AbstractIndexOperation op, UUID errNodeId, String errMsg) {
- super(op);
-
- this.errNodeId = errNodeId;
- this.errMsg = errMsg;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return false;
- }
-
- /**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return errNodeId != null;
- }
-
- /**
- * @return ID of the node reported an error (if any).
- */
- @Nullable public UUID errorNodeId() {
- return errNodeId;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public String errorMessage() {
- return errMsg;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IndexAckDiscoveryMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
new file mode 100644
index 0000000..5a2d66c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexFinishDiscoveryMessage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * Index creation finished discovery message.
+ */
+public class IndexFinishDiscoveryMessage extends IndexAbstractDiscoveryMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Node reported an error. */
+ private final UUID errNodeId;
+
+ /** Error message. */
+ private final String errMsg;
+ /**
+ * Constructor.
+ *
+ * @param op Original operation.
+ * @param errNodeId Node reported an error.
+ * @param errMsg Error message.
+ */
+ public IndexFinishDiscoveryMessage(AbstractIndexOperation op, UUID errNodeId, String errMsg) {
+ super(op);
+
+ this.errNodeId = errNodeId;
+ this.errMsg = errMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return errNodeId != null;
+ }
+
+ /**
+ * @return ID of the node reported an error (if any).
+ */
+ @Nullable public UUID errorNodeId() {
+ return errNodeId;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public String errorMessage() {
+ return errMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexFinishDiscoveryMessage.class, this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
deleted file mode 100644
index e47c901..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexInitDiscoveryMessage.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.ddl;
-
-import org.apache.ignite.internal.ContextAware;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.UUID;
-
-/**
- * {@code INIT} part of a distributed index create/drop operation.
- */
-public class IndexInitDiscoveryMessage extends IndexAbstractDiscoveryMessage implements ContextAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- // TODO: Do we really need it?
- /** Kernal context. */
- @GridToStringExclude
- private transient GridKernalContext ctx;
-
- /** Node reported an error. */
- private UUID errNodeId;
-
- /** Error message. */
- private String errMsg;
-
- /**
- * Constructor.
- *
- * @param op Operation.
- */
- public IndexInitDiscoveryMessage(AbstractIndexOperation op) {
- super(op);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return new IndexAckDiscoveryMessage(op, errNodeId, errMsg);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void context(GridKernalContext ctx) {
- this.ctx = ctx;
- }
-
- /**
- * Set error.
- *
- * @param errNodeId Error node ID.
- * @param errMsg Error message.
- */
- public void onError(UUID errNodeId, String errMsg) {
- if (!hasError()) {
- this.errNodeId = errNodeId;
- this.errMsg = errMsg;
- }
- }
-
- /**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return errNodeId != null;
- }
-
- /**
- * @return ID of the node reported an error (if any).
- */
- @Nullable public UUID errorNodeId() {
- return errNodeId;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public String errorMessage() {
- return errMsg;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IndexInitDiscoveryMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e9ba975/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
new file mode 100644
index 0000000..6d6c72e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexProposeDiscoveryMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * Propose part of a distributed index create/drop operation.
+ */
+public class IndexProposeDiscoveryMessage extends IndexAbstractDiscoveryMessage implements ContextAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ // TODO: Do we really need it?
+ /** Kernal context. */
+ @GridToStringExclude
+ private transient GridKernalContext ctx;
+
+ /** Node reported an error. */
+ private UUID errNodeId;
+
+ /** Error message. */
+ private String errMsg;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ public IndexProposeDiscoveryMessage(AbstractIndexOperation op) {
+ super(op);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return hasError() ?
+ new IndexFinishDiscoveryMessage(op, errNodeId, errMsg) :
+ new IndexAcceptDiscoveryMessage(op);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void context(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /**
+ * Set error.
+ *
+ * @param errNodeId Error node ID.
+ * @param errMsg Error message.
+ */
+ public void onError(UUID errNodeId, String errMsg) {
+ if (!hasError()) {
+ this.errNodeId = errNodeId;
+ this.errMsg = errMsg;
+ }
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return errNodeId != null;
+ }
+
+ /**
+ * @return ID of the node reported an error (if any).
+ */
+ @Nullable public UUID errorNodeId() {
+ return errNodeId;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public String errorMessage() {
+ return errMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexProposeDiscoveryMessage.class, this);
+ }
+}