You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 09:24:18 UTC
ignite git commit: WIP on correct discovery data sharing.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl f8a5fef56 -> 4a5bb9375
WIP on correct discovery data sharing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a5bb937
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a5bb937
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a5bb937
Branch: refs/heads/ignite-4565-ddl
Commit: 4a5bb9375556a0b141b71b5431f3b291e1287a3e
Parents: f8a5fef
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 12:24:09 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 12:24:09 2017 +0300
----------------------------------------------------------------------
.../cache/DynamicCacheDescriptor.java | 18 +++
.../processors/cache/GridCacheProcessor.java | 138 ++++++++++++++++++-
.../cache/query/GridCacheQueryManager.java | 12 +-
.../processors/query/GridQueryProcessor.java | 43 +-----
.../query/ddl/AbstractIndexOperation.java | 14 +-
.../query/ddl/CreateIndexOperation.java | 13 +-
.../query/ddl/DropIndexOperation.java | 13 +-
.../ddl/IndexAbstractDiscoveryMessage.java | 2 +
.../query/ddl/IndexAckDiscoveryMessage.java | 29 ++++
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
10 files changed, 211 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 8b62f03..5ada093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
+import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -81,6 +82,9 @@ public class DynamicCacheDescriptor {
/** */
private AffinityTopologyVersion rcvdFromVer;
+ /** Index operation in init phase. */
+ private AbstractIndexOperation idxInitOp;
+
/**
* @param ctx Context.
* @param cacheCfg Cache configuration.
@@ -302,6 +306,20 @@ public class DynamicCacheDescriptor {
return rcvdFrom;
}
+ /**
+ * @return Pending index init operation.
+ */
+ public AbstractIndexOperation indexInitOperation() {
+ return idxInitOp;
+ }
+
+ /**
+ * @param idxInitOp Pending index init operation.
+ */
+ public void indexInitOperation(AbstractIndexOperation idxInitOp) {
+ this.idxInitOp = idxInitOp;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 77702f0..9b33df0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -65,6 +65,12 @@ import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
@@ -104,6 +110,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -205,6 +212,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** */
private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
+ /** ID history for index create/drop discovery messages. */
+ private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> idxDiscoMsgIdHist =
+ new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
/**
* @param ctx Kernal context.
*/
@@ -1075,6 +1086,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
GridCacheContext<?, ?> cacheCtx = cache.context();
+ // TODO: Make sure that pending init operation is passed to indexing.
ctx.query().onCacheStart(cacheCtx);
ctx.continuous().onCacheStart(cacheCtx);
@@ -2532,7 +2544,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
return new GridFinishedFuture<>(new IgniteException("Cache doesn't exist: " + cacheName));
- return cache.context().queries().createIndex(tblName, idx, ifNotExists);
+ return cache.context().queries().dynamicIndexCreate(tblName, idx, ifNotExists);
}
/**
@@ -2673,6 +2685,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return {@code True} if minor topology version should be increased.
*/
public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) {
+ if (msg instanceof IndexInitDiscoveryMessage) {
+ onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
+
+ return false;
+ }
+
+ if (msg instanceof IndexAckDiscoveryMessage) {
+ onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
+
+ return false;
+ }
+
if (msg instanceof CacheAffinityChangeMessage)
return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));
@@ -2680,6 +2704,98 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Handle cache index init discovery message.
+ *
+ * @param msg Message.
+ */
+ private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+ UUID locNodeId = ctx.localNodeId();
+
+ if (!indexMessageValid(msg))
+ return;
+
+ AbstractIndexOperation op = msg.operation();
+
+ // Ignore in case error was reported by another node earlier.
+ if (msg.hasError()) {
+ if (log.isDebugEnabled())
+ log.debug("Received index init message with error reported by another node (will ignore): " + msg);
+
+ return;
+ }
+
+ // Ensure cache exists.
+ DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+ if (desc == null) {
+ msg.onError(locNodeId, "Cache doesn't exit on node [cacheName=" + op.space() +
+ ". nodeId=" + locNodeId + ']');
+
+ return;
+ }
+
+ // Validate request on descriptor level.
+ AbstractIndexOperation oldOp = desc.indexInitOperation();
+
+ if (oldOp != null) {
+ msg.onError(locNodeId, "Failed to create/drop cache index because another pending operation is in " +
+ "progress [cacheName=" + op.space() + ". newOp=" + op + ", oldOp=" + oldOp + ']');
+
+ return;
+ }
+
+ if (op instanceof CreateIndexOperation) {
+ CreateIndexOperation op0 = (CreateIndexOperation)op;
+
+ // TODO
+ }
+ else if (op instanceof DropIndexOperation) {
+ DropIndexOperation op0 = (DropIndexOperation)op;
+
+ // TODO
+ }
+ else
+ U.warn(log, "Received index init message with unsupported operation (will ignore): " + msg);
+
+ // For already started cache we must make sure that indexing manager will be able to accommodate it.
+ if (isMissingQueryCache(desc))
+ cache(op.space()).context().queries().onIndexInitDiscoveryMessage(msg);
+
+ // Finally, set init operation to cache descriptor.
+ if (!msg.hasError())
+ desc.indexInitOperation(op);
+ }
+
+ /**
+ * Handle cache index ack discovery message.
+ *
+ * @param msg Message.
+ */
+ private void onIndexAckDiscoveryMessage(IndexAckDiscoveryMessage msg) {
+ if (!indexMessageValid(msg))
+ return;
+
+ // TODO
+ }
+
+ /**
+ * Ensure that arrived discovery message is not duplicate.
+ *
+ * @param msg Message.
+ * @return {@code True} if message is not duplicated and should be processed further.
+ */
+ private boolean indexMessageValid(IndexAbstractDiscoveryMessage msg) {
+ IgniteUuid id = msg.id();
+
+ boolean res = idxDiscoMsgIdHist.add(id);
+
+ if (!res)
+ U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
+
+ return res;
+ }
+
+ /**
* @param batch Change request batch.
* @param topVer Current topology version.
* @return {@code True} if minor topology version should be increased.
@@ -3569,16 +3685,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @throws IgniteCheckedException In case of error.
*/
- public void createMissingCaches() throws IgniteCheckedException {
+ public void createMissingQueryCaches() throws IgniteCheckedException {
for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
- CacheConfiguration ccfg = e.getValue().cacheConfiguration();
+ DynamicCacheDescriptor desc = e.getValue();
- if (!caches.containsKey(maskNull(ccfg.getName())) && QueryUtils.isEnabled(ccfg))
- dynamicStartCache(null, ccfg.getName(), null, false, true, true).get();
+ if (isMissingQueryCache(desc))
+ dynamicStartCache(null, desc.cacheConfiguration().getName(), null, false, true, true).get();
}
}
/**
+ * Whether cache defined by provided descriptor is not yet started and has queries enabled.
+ *
+ * @param desc Descriptor.
+ * @return {@code True} if this is missing query cache.
+ */
+ private boolean isMissingQueryCache(DynamicCacheDescriptor desc) {
+ CacheConfiguration ccfg = desc.cacheConfiguration();
+
+ return !caches.containsKey(maskNull(ccfg.getName())) && QueryUtils.isEnabled(ccfg);
+ }
+
+ /**
* Registers MBean for cache components.
*
* @param o Cache component.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 30858f9..c7d5feb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -518,7 +519,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param ifNotExists When set to {@code true} operation will fail if index already exists.
* @return Future completed when index is created.
*/
- public IgniteInternalFuture<?> createIndex(String tblName, QueryIndex idx, boolean ifNotExists) {
+ public IgniteInternalFuture<?> dynamicIndexCreate(String tblName, QueryIndex idx, boolean ifNotExists) {
if (!enterBusy())
return new GridFinishedFuture<>(new IgniteException("Failed to create index because " +
"local node is stopping."));
@@ -536,6 +537,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * Validate index init discovery message.
+ *
+ * @param msg Message.
+ */
+ public void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+ qryProc.onIndexInitDiscoveryMessage(space, msg);
+ }
+
+ /**
* Undeploys given class loader.
*
* @param ldr Class loader to undeploy.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a65b318..b8fa36f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -42,12 +42,10 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -65,7 +63,6 @@ import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMes
import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -113,10 +110,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Index create/drop client futures. */
private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>();
- /** ID history for index create/drop discovery messages. */
- private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> idxDiscoMsgIdHist =
- new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
-
/** */
private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
@@ -148,23 +141,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idx.start(ctx, busyLock);
}
- ctx.discovery().setCustomEventListener(IndexAbstractDiscoveryMessage.class,
- new CustomEventListener<IndexAbstractDiscoveryMessage>() {
- /** {@inheritDoc} */
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
- IndexAbstractDiscoveryMessage msg) {
- if (notDuplicate(msg)) {
- if (msg instanceof IndexInitDiscoveryMessage)
- onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
- if (msg instanceof IndexAckDiscoveryMessage)
- onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
- else
- U.warn(log, "Unexpected custom discovery message [msg=" + msg + ']');
- }
- }
- });
-
// Schedule queries detail metrics eviction.
qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
@Override public void run() {
@@ -314,13 +290,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* Handle index init discovery message.
*
+ * @param space Space.
* @param msg Message.
*/
- private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
- // Return message with existing error.
- if (msg.hasError())
- return;
-
+ public void onIndexInitDiscoveryMessage(String space, IndexInitDiscoveryMessage msg) {
// Validate.
idxLock.writeLock().lock();
@@ -1173,18 +1146,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Ensuret that arrived discovery message is not duplicate.
- *
- * @param msg Message.
- * @return {@code True} if message is not duplicated and should be processed further.
- */
- private boolean notDuplicate(IndexAbstractDiscoveryMessage msg) {
- IgniteUuid id = msg.id();
-
- return idxDiscoMsgIdHist.add(id);
- }
-
- /**
* @param ver Version.
*/
public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
index cfa8f24..24600e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/AbstractIndexOperation.java
@@ -33,15 +33,20 @@ public abstract class AbstractIndexOperation implements Serializable {
/** Operation ID. */
private final UUID opId;
+ /** Space. */
+ private final String space;
+
/**
* Constructor.
*
* @param cliNodeId Client node ID.
* @param opId Operation ID.
+ * @param space Space.
*/
- public AbstractIndexOperation(UUID cliNodeId, UUID opId) {
+ public AbstractIndexOperation(UUID cliNodeId, UUID opId, String space) {
this.cliNodeId = cliNodeId;
this.opId = opId;
+ this.space = space;
}
/**
@@ -57,4 +62,11 @@ public abstract class AbstractIndexOperation implements Serializable {
public UUID operationId() {
return opId;
}
+
+ /**
+ * @return Space.
+ */
+ public String space() {
+ return space;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
index 125d0e2..7030586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/CreateIndexOperation.java
@@ -30,9 +30,6 @@ public class CreateIndexOperation extends AbstractIndexOperation {
/** */
private static final long serialVersionUID = 0L;
- /** Space. */
- private final String space;
-
/** Table name. */
private final String tblName;
@@ -55,9 +52,8 @@ public class CreateIndexOperation extends AbstractIndexOperation {
*/
public CreateIndexOperation(UUID cliNodeId, UUID opId, String space, String tblName, QueryIndex idx,
boolean ifNotExists) {
- super(cliNodeId, opId);
+ super(cliNodeId, opId, space);
- this.space = space;
this.tblName = tblName;
this.idx = idx;
this.ifNotExists = ifNotExists;
@@ -71,13 +67,6 @@ public class CreateIndexOperation extends AbstractIndexOperation {
}
/**
- * @return Schema name.
- */
- public String space() {
- return space;
- }
-
- /**
* @return Table name.
*/
public String tableName() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
index 14c3ffd..be2941c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DropIndexOperation.java
@@ -28,9 +28,6 @@ public class DropIndexOperation extends AbstractIndexOperation {
/** */
private static final long serialVersionUID = 0L;
- /** Space. */
- private final String space;
-
/** Index name. */
private final String idxName;
@@ -47,21 +44,13 @@ public class DropIndexOperation extends AbstractIndexOperation {
* @param ifExists Ignore operation if index doesn't exist.
*/
DropIndexOperation(UUID cliNodeId, UUID opId, String space, String idxName, boolean ifExists) {
- super(cliNodeId, opId);
+ super(cliNodeId, opId, space);
- this.space = space;
this.idxName = idxName;
this.ifExists = ifExists;
}
/**
- * @return Space.
- */
- public String space() {
- return space;
- }
-
- /**
* @return Index name.
*/
public String indexName() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index b7599e6..2912986 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.ddl;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -32,6 +33,7 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
private final IgniteUuid id = IgniteUuid.randomUuid();
/** Operation. */
+ @GridToStringInclude
private final AbstractIndexOperation op;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
index 44a89fc..4fb224d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAckDiscoveryMessage.java
@@ -21,6 +21,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import java.util.UUID;
+
/**
* {@code ACK} message which triggers local index create/drop.
*/
@@ -28,6 +30,12 @@ public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** Node reported an error. */
+ private UUID errNodeId;
+
+ /** Error message. */
+ private String errMsg;
+
/**
* Constructor.
*
@@ -47,6 +55,27 @@ public class IndexAckDiscoveryMessage extends IndexAbstractDiscoveryMessage {
return false;
}
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return errNodeId != null;
+ }
+
+ /**
+ * @return ID of the node reported an error (if any).
+ */
+ @Nullable public UUID errorNodeId() {
+ return errNodeId;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public String errorMessage() {
+ return errMsg;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IndexAckDiscoveryMessage.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a5bb937/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 719897a..bfe51a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1449,7 +1449,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
catch (SQLException e) {
if (!cachesCreated && e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1) {
try {
- ctx.cache().createMissingCaches();
+ ctx.cache().createMissingQueryCaches();
}
catch (IgniteCheckedException ignored) {
throw new CacheException("Failed to create missing caches.", e);