You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/20 08:39:41 UTC
[34/71] [abbrv] ignite git commit: IGNITE-4565: Implemented CREATE
INDEX and DROP INDEX. This closes #1773. This closes #1804.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
index 7677d0d..119a389 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
@@ -23,7 +23,10 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -33,6 +36,9 @@ import java.util.Map;
* Descriptor of type.
*/
public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
+ /** Space. */
+ private final String space;
+
/** */
private String name;
@@ -50,9 +56,15 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
/** Map with upper cased property names to help find properties based on SQL INSERT and MERGE queries. */
private final Map<String, GridQueryProperty> uppercaseProps = new HashMap<>();
+ /** Mutex for operations on indexes. */
+ private final Object idxMux = new Object();
+
/** */
@GridToStringInclude
- private final Map<String, QueryIndexDescriptorImpl> indexes = new HashMap<>();
+ private final Map<String, QueryIndexDescriptorImpl> idxs = new HashMap<>();
+
+ /** Aliases. */
+ private Map<String, String> aliases;
/** */
private QueryIndexDescriptorImpl fullTextIdx;
@@ -78,6 +90,25 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
/** */
private String affKey;
+ /** Obsolete. */
+ private volatile boolean obsolete;
+
+ /**
+ * Constructor.
+ *
+ * @param space Cache name.
+ */
+ public QueryTypeDescriptorImpl(String space) {
+ this.space = space;
+ }
+
+ /**
+ * @return Space.
+ */
+ public String space() {
+ return space;
+ }
+
/** {@inheritDoc} */
@Override public String name() {
return name;
@@ -97,7 +128,7 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
* @return Table name.
*/
@Override public String tableName() {
- return tblName;
+ return tblName != null ? tblName : name;
}
/**
@@ -160,7 +191,9 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
/** {@inheritDoc} */
@Override public Map<String, GridQueryIndexDescriptor> indexes() {
- return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(indexes);
+ synchronized (idxMux) {
+ return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(idxs);
+ }
}
/** {@inheritDoc} */
@@ -176,59 +209,74 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
}
/**
- * Adds index.
+ * Get index by name.
*
- * @param idxName Index name.
- * @param type Index type.
- * @param inlineSize Inline size.
- * @return Index descriptor.
- * @throws IgniteCheckedException In case of error.
+ * @param name Name.
+ * @return Index.
*/
- public QueryIndexDescriptorImpl addIndex(String idxName, QueryIndexType type, int inlineSize) throws IgniteCheckedException {
- QueryIndexDescriptorImpl idx = new QueryIndexDescriptorImpl(type, inlineSize);
+ @Nullable public QueryIndexDescriptorImpl index(String name) {
+ synchronized (idxMux) {
+ return idxs.get(name);
+ }
+ }
- if (indexes.put(idxName, idx) != null)
- throw new IgniteCheckedException("Index with name '" + idxName + "' already exists.");
+ /**
+ * @return Raw index descriptors.
+ */
+ public Collection<QueryIndexDescriptorImpl> indexes0() {
+ synchronized (idxMux) {
+ return new ArrayList<>(idxs.values());
+ }
+ }
- return idx;
+ /** {@inheritDoc} */
+ @Override public GridQueryIndexDescriptor textIndex() {
+ return fullTextIdx;
}
/**
- * Adds field to index.
+ * Add index.
*
- * @param idxName Index name.
- * @param field Field name.
- * @param orderNum Fields order number in index.
- * @param inlineSize Inline size.
- * @param descending Sorting order.
+ * @param idx Index.
* @throws IgniteCheckedException If failed.
*/
- public void addFieldToIndex(
- String idxName,
- String field,
- int orderNum,
- int inlineSize,
- boolean descending
- ) throws IgniteCheckedException {
- QueryIndexDescriptorImpl desc = indexes.get(idxName);
+ public void addIndex(QueryIndexDescriptorImpl idx) throws IgniteCheckedException {
+ synchronized (idxMux) {
+ if (idxs.put(idx.name(), idx) != null)
+ throw new IgniteCheckedException("Index with name '" + idx.name() + "' already exists.");
+ }
+ }
- if (desc == null)
- desc = addIndex(idxName, QueryIndexType.SORTED, inlineSize);
+ /**
+ * Drop index.
+ *
+ * @param idxName Index name.
+ */
+ public void dropIndex(String idxName) {
+ synchronized (idxMux) {
+ idxs.remove(idxName);
+ }
+ }
- desc.addField(field, orderNum, descending);
+ /**
+ * Chedk if particular field exists.
+ *
+ * @param field Field.
+ * @return {@code True} if exists.
+ */
+ public boolean hasField(String field) {
+ return props.containsKey(field) || QueryUtils._VAL.equalsIgnoreCase(field);
}
/**
* Adds field to text index.
*
* @param field Field name.
+ * @throws IgniteCheckedException If failed.
*/
- public void addFieldToTextIndex(String field) {
- if (fullTextIdx == null) {
- fullTextIdx = new QueryIndexDescriptorImpl(QueryIndexType.FULLTEXT, 0);
-
- indexes.put(null, fullTextIdx);
- }
+ public void addFieldToTextIndex(String field) throws IgniteCheckedException {
+ if (fullTextIdx == null)
+ fullTextIdx = new QueryIndexDescriptorImpl(this, null, QueryIndexType.FULLTEXT, 0);
fullTextIdx.addField(field, 0, false);
}
@@ -335,6 +383,34 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
this.affKey = affKey;
}
+ /**
+ * @return Aliases.
+ */
+ public Map<String, String> aliases() {
+ return aliases != null ? aliases : Collections.<String, String>emptyMap();
+ }
+
+ /**
+ * @param aliases Aliases.
+ */
+ public void aliases(Map<String, String> aliases) {
+ this.aliases = aliases;
+ }
+
+ /**
+ * @return {@code True} if obsolete.
+ */
+ public boolean obsolete() {
+ return obsolete;
+ }
+
+ /**
+ * Mark index as obsolete.
+ */
+ public void markObsolete() {
+ obsolete = true;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QueryTypeDescriptorImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index f00cbd6..3a7437b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
import org.apache.ignite.internal.processors.query.property.QueryClassProperty;
import org.apache.ignite.internal.processors.query.property.QueryFieldAccessor;
@@ -44,7 +45,6 @@ import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.sql.Time;
import java.sql.Timestamp;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -52,6 +52,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
/**
* Utility methods for queries.
*/
@@ -59,6 +62,9 @@ public class QueryUtils {
/** */
public static final String _VAL = "_val";
+ /** Discovery history size. */
+ private static final int DISCO_HIST_SIZE = getInteger(IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE, 1000);
+
/** */
private static final Class<?> GEOMETRY_CLASS = U.classForName("com.vividsolutions.jts.geom.Geometry", null);
@@ -82,6 +88,69 @@ public class QueryUtils {
));
/**
+ * Get table name for entity.
+ *
+ * @param entity Entity.
+ * @return Table name.
+ */
+ public static String tableName(QueryEntity entity) {
+ String res = entity.getTableName();
+
+ if (res == null)
+ res = typeName(entity.getValueType());
+
+ return res;
+ }
+
+ /**
+ * Get index name.
+ *
+ * @param entity Query entity.
+ * @param idx Index.
+ * @return Index name.
+ */
+ public static String indexName(QueryEntity entity, QueryIndex idx) {
+ return indexName(tableName(entity), idx);
+ }
+
+ /**
+ * Get index name.
+ *
+ * @param tblName Table name.
+ * @param idx Index.
+ * @return Index name.
+ */
+ public static String indexName(String tblName, QueryIndex idx) {
+ String res = idx.getName();
+
+ if (res == null) {
+ StringBuilder idxName = new StringBuilder(tblName + "_");
+
+ for (Map.Entry<String, Boolean> field : idx.getFields().entrySet()) {
+ idxName.append(field.getKey());
+
+ idxName.append('_');
+ idxName.append(field.getValue() ? "asc_" : "desc_");
+ }
+
+ for (int i = 0; i < idxName.length(); i++) {
+ char ch = idxName.charAt(i);
+
+ if (Character.isWhitespace(ch))
+ idxName.setCharAt(i, '_');
+ else
+ idxName.setCharAt(i, Character.toLowerCase(ch));
+ }
+
+ idxName.append("idx");
+
+ return idxName.toString();
+ }
+
+ return res;
+ }
+
+ /**
* Create type candidate for query entity.
*
* @param space Space.
@@ -103,7 +172,9 @@ public class QueryUtils {
CacheObjectContext coCtx = binaryEnabled ? ctx.cacheObjects().contextForCache(ccfg) : null;
- QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl();
+ QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(space);
+
+ desc.aliases(qryEntity.getAliases());
// Key and value classes still can be available if they are primitive or JDK part.
// We need that to set correct types for _key and _val columns.
@@ -206,11 +277,6 @@ public class QueryUtils {
*/
public static void processBinaryMeta(GridKernalContext ctx, QueryEntity qryEntity, QueryTypeDescriptorImpl d)
throws IgniteCheckedException {
- Map<String,String> aliases = qryEntity.getAliases();
-
- if (aliases == null)
- aliases = Collections.emptyMap();
-
Set<String> keyFields = qryEntity.getKeyFields();
// We have to distinguish between empty and null keyFields when the key is not of SQL type -
@@ -239,7 +305,7 @@ public class QueryUtils {
isKeyField = (hasKeyFields ? keyFields.contains(entry.getKey()) : null);
QueryBinaryProperty prop = buildBinaryProperty(ctx, entry.getKey(),
- U.classForName(entry.getValue(), Object.class, true), aliases, isKeyField);
+ U.classForName(entry.getValue(), Object.class, true), d.aliases(), isKeyField);
d.addProperty(prop, false);
}
@@ -256,18 +322,13 @@ public class QueryUtils {
*/
public static void processClassMeta(QueryEntity qryEntity, QueryTypeDescriptorImpl d, CacheObjectContext coCtx)
throws IgniteCheckedException {
- Map<String,String> aliases = qryEntity.getAliases();
-
- if (aliases == null)
- aliases = Collections.emptyMap();
-
for (Map.Entry<String, String> entry : qryEntity.getFields().entrySet()) {
QueryClassProperty prop = buildClassProperty(
d.keyClass(),
d.valueClass(),
entry.getKey(),
U.classForName(entry.getValue(), Object.class),
- aliases,
+ d.aliases(),
coCtx);
d.addProperty(prop, false);
@@ -275,7 +336,7 @@ public class QueryUtils {
processIndexes(qryEntity, d);
}
-
+
/**
* Processes indexes based on query entity.
*
@@ -285,53 +346,90 @@ public class QueryUtils {
*/
private static void processIndexes(QueryEntity qryEntity, QueryTypeDescriptorImpl d) throws IgniteCheckedException {
if (!F.isEmpty(qryEntity.getIndexes())) {
- Map<String, String> aliases = qryEntity.getAliases();
+ for (QueryIndex idx : qryEntity.getIndexes())
+ processIndex(idx, d);
+ }
+ }
- if (aliases == null)
- aliases = Collections.emptyMap();
+ /**
+ * Process dynamic index change.
+ *
+ * @param idx Index.
+ * @param d Type descriptor to populate.
+ * @throws IgniteCheckedException If failed to build index information.
+ */
+ public static void processDynamicIndexChange(String idxName, @Nullable QueryIndex idx, QueryTypeDescriptorImpl d)
+ throws IgniteCheckedException {
+ d.dropIndex(idxName);
- for (QueryIndex idx : qryEntity.getIndexes()) {
- String idxName = idx.getName();
+ if (idx != null)
+ processIndex(idx, d);
+ }
- if (idxName == null)
- idxName = QueryEntity.defaultIndexName(idx);
+ /**
+ * Create index descriptor.
+ *
+ * @param typeDesc Type descriptor.
+ * @param idx Index.
+ * @return Index descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static QueryIndexDescriptorImpl createIndexDescriptor(QueryTypeDescriptorImpl typeDesc, QueryIndex idx)
+ throws IgniteCheckedException {
+ String idxName = indexName(typeDesc.tableName(), idx);
+ QueryIndexType idxTyp = idx.getIndexType();
- QueryIndexType idxTyp = idx.getIndexType();
+ assert idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL;
- if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) {
- d.addIndex(idxName, idxTyp, idx.getInlineSize());
+ QueryIndexDescriptorImpl res = new QueryIndexDescriptorImpl(typeDesc, idxName, idxTyp, idx.getInlineSize());
- int i = 0;
+ int i = 0;
- for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) {
- String field = entry.getKey();
- boolean asc = entry.getValue();
+ for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) {
+ String field = entry.getKey();
+ boolean asc = entry.getValue();
- String alias = aliases.get(field);
+ String alias = typeDesc.aliases().get(field);
- if (alias != null)
- field = alias;
+ if (alias != null)
+ field = alias;
- d.addFieldToIndex(idxName, field, i++, idx.getInlineSize(), !asc);
- }
- }
- else if (idxTyp == QueryIndexType.FULLTEXT){
- for (String field : idx.getFields().keySet()) {
- String alias = aliases.get(field);
+ res.addField(field, i++, !asc);
+ }
- if (alias != null)
- field = alias;
+ return res;
+ }
- d.addFieldToTextIndex(field);
- }
- }
- else if (idxTyp != null)
- throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() +
- ", typ=" + idxTyp + ']');
- else
- throw new IllegalArgumentException("Index type is not set: " + idx.getName());
+ /**
+ * Process single index.
+ *
+ * @param idx Index.
+ * @param d Type descriptor to populate.
+ * @throws IgniteCheckedException If failed to build index information.
+ */
+ private static void processIndex(QueryIndex idx, QueryTypeDescriptorImpl d) throws IgniteCheckedException {
+ QueryIndexType idxTyp = idx.getIndexType();
+
+ if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) {
+ QueryIndexDescriptorImpl idxDesc = createIndexDescriptor(d, idx);
+
+ d.addIndex(idxDesc);
+ }
+ else if (idxTyp == QueryIndexType.FULLTEXT){
+ for (String field : idx.getFields().keySet()) {
+ String alias = d.aliases().get(field);
+
+ if (alias != null)
+ field = alias;
+
+ d.addFieldToTextIndex(field);
}
}
+ else if (idxTyp != null)
+ throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() +
+ ", typ=" + idxTyp + ']');
+ else
+ throw new IllegalArgumentException("Index type is not set: " + idx.getName());
}
/**
@@ -674,6 +772,31 @@ public class QueryUtils {
}
/**
+ * Discovery history size.
+ *
+ * @return Discovery history size.
+ */
+ public static int discoveryHistorySize() {
+ return DISCO_HIST_SIZE;
+ }
+
+ /**
+ * Wrap schema exception if needed.
+ *
+ * @param e Original exception.
+ * @return Schema exception.
+ */
+ @Nullable public static SchemaOperationException wrapIfNeeded(@Nullable Exception e) {
+ if (e == null)
+ return null;
+
+ if (e instanceof SchemaOperationException)
+ return (SchemaOperationException)e;
+
+ return new SchemaOperationException("Unexpected exception.", e);
+ }
+
+ /**
* Private constructor.
*/
private QueryUtils() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
new file mode 100644
index 0000000..f97f931
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache schema change task for exchange worker.
+ */
+public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+ /** Message. */
+ private final SchemaAbstractDiscoveryMessage msg;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Message.
+ */
+ public SchemaExchangeWorkerTask(SchemaAbstractDiscoveryMessage msg) {
+ assert msg != null;
+
+ this.msg = msg;
+ }
+
+ /**
+ * @return Message.
+ */
+ public SchemaAbstractDiscoveryMessage message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaExchangeWorkerTask.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
new file mode 100644
index 0000000..3321e66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Closure that internally applies given {@link SchemaIndexCacheVisitorClosure} to some set of entries.
+ */
+public interface SchemaIndexCacheVisitor {
+ /**
+ * Visit cache entries and pass them to closure.
+ *
+ * @param clo Closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
new file mode 100644
index 0000000..7f50089
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Index closure accepting current entry state.
+ */
+public interface SchemaIndexCacheVisitorClosure {
+ /**
+ * Apply closure.
+ *
+ * @param key Key.
+ * @param part Partition.
+ * @param val Value.
+ * @param ver Version.
+ * @param expiration Expiration.
+ * @param link Link.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver, long expiration, long link)
+ throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
new file mode 100644
index 0000000..58c909d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.Collection;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
+
+/**
+ * Traversor operating all primary and backup partitions of given cache.
+ */
+public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
+ /** Query procssor. */
+ private final GridQueryProcessor qryProc;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Space name. */
+ private final String spaceName;
+
+ /** Table name. */
+ private final String tblName;
+
+ /** Cancellation token. */
+ private final SchemaIndexOperationCancellationToken cancel;
+
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache context.
+ * @param spaceName Space name.
+ * @param tblName Table name.
+ * @param cancel Cancellation token.
+ */
+ public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx, String spaceName,
+ String tblName, SchemaIndexOperationCancellationToken cancel) {
+ this.qryProc = qryProc;
+ this.spaceName = spaceName;
+ this.tblName = tblName;
+ this.cancel = cancel;
+
+ if (cctx.isNear())
+ cctx = ((GridNearCacheAdapter)cctx.cache()).dht().context();
+
+ this.cctx = cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
+ assert clo != null;
+
+ FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo);
+
+ Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+
+ for (GridDhtLocalPartition part : parts)
+ processPartition(part, filterClo);
+ }
+
+ /**
+ * Process partition.
+ *
+ * @param part Partition.
+ * @param clo Index closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo)
+ throws IgniteCheckedException {
+ checkCancelled();
+
+ boolean reserved = false;
+
+ if (part != null && part.state() != EVICTED)
+ reserved = (part.state() == OWNING || part.state() == RENTING) && part.reserve();
+
+ if (!reserved)
+ return;
+
+ try {
+ GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor();
+
+ while (cursor.next()) {
+ CacheDataRow row = cursor.get();
+
+ KeyCacheObject key = row.key();
+
+ processKey(key, row.link(), clo);
+ }
+ }
+ finally {
+ part.release();
+ }
+ }
+
+ /**
+ * Process single key.
+ *
+ * @param key Key.
+ * @param link Link.
+ * @param clo Closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processKey(KeyCacheObject key, long link, FilteringVisitorClosure clo) throws IgniteCheckedException {
+ while (true) {
+ try {
+ checkCancelled();
+
+ GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+ try {
+ entry.updateIndex(clo, link);
+ }
+ finally {
+ cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Check if visit process is not cancelled.
+ *
+ * @throws IgniteCheckedException If cancelled.
+ */
+ private void checkCancelled() throws IgniteCheckedException {
+ if (cancel.isCancelled())
+ throw new IgniteCheckedException("Index creation was cancelled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaIndexCacheVisitorImpl.class, this);
+ }
+
+ /**
+ * Filtering visitor closure.
+ */
+ private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure {
+
+ /** Target closure. */
+ private final SchemaIndexCacheVisitorClosure target;
+
+ /**
+ * Constructor.
+ *
+ * @param target Target.
+ */
+ public FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) {
+ this.target = target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver,
+ long expiration, long link) throws IgniteCheckedException {
+ if (qryProc.belongsToTable(cctx, spaceName, tblName, key, val))
+ target.apply(key, part, val, ver, expiration, link);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
new file mode 100644
index 0000000..1bc3434
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class SchemaIndexOperationCancellationToken {
+ /** Cancel flag. */
+ private final AtomicBoolean flag = new AtomicBoolean();
+
+ /**
+ * Get cancel state.
+ *
+ * @return {@code True} if cancelled.
+ */
+ public boolean isCancelled() {
+ return flag.get();
+ }
+
+ /**
+ * Do cancel.
+ *
+ * @return {@code True} if cancel flag was set by this call.
+ */
+ public boolean cancel() {
+ return flag.compareAndSet(false, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaIndexOperationCancellationToken.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
new file mode 100644
index 0000000..3f12b77
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Schema key.
+ */
+public class SchemaKey {
+ /** Space. */
+ private final String space;
+
+ /** Deployment ID. */
+ private final IgniteUuid depId;
+
+ /**
+ * Constructor.
+ *
+ * @param space Space.
+ * @param depId Deployment ID.
+ */
+ public SchemaKey(String space, IgniteUuid depId) {
+ this.space = space;
+ this.depId = depId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * (space != null ? space.hashCode() : 0) + depId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj instanceof SchemaKey) {
+ SchemaKey other = (SchemaKey)obj;
+
+ return F.eq(space, other.space) && F.eq(depId, other.depId);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
new file mode 100644
index 0000000..79fbfcd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Node leave exchange worker task.
+ */
+public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+ /** Node. */
+ @GridToStringInclude
+ private final ClusterNode node;
+
+ /**
+ * Constructor.
+ *
+ * @param node Node.
+ */
+ public SchemaNodeLeaveExchangeWorkerTask(ClusterNode node) {
+ this.node = node;
+ }
+
+ /**
+ * @return Node.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaNodeLeaveExchangeWorkerTask.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
new file mode 100644
index 0000000..6c47aff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Schema operation client future.
+ */
+public class SchemaOperationClientFuture extends GridFutureAdapter<Object> {
+ /** Operation ID. */
+ private final UUID opId;
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ */
+ public SchemaOperationClientFuture(UUID opId) {
+ this.opId = opId;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ public UUID operationId() {
+ return opId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaOperationClientFuture.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
new file mode 100644
index 0000000..f0db026
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Schema operation exception.
+ */
+public class SchemaOperationException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Code: generic error. */
+ public static final int CODE_GENERIC = 0;
+
+ /** Code: cache not found. */
+ public static final int CODE_CACHE_NOT_FOUND = 1;
+
+ /** Code: table not found. */
+ public static final int CODE_TABLE_NOT_FOUND = 2;
+
+ /** Code: table already exists. */
+ public static final int CODE_TABLE_EXISTS = 3;
+
+ /** Code: column not found. */
+ public static final int CODE_COLUMN_NOT_FOUND = 4;
+
+ /** Code: column already exists. */
+ public static final int CODE_COLUMN_EXISTS = 5;
+
+ /** Code: index not found. */
+ public static final int CODE_INDEX_NOT_FOUND = 6;
+
+ /** Code: index already exists. */
+ public static final int CODE_INDEX_EXISTS = 7;
+
+ /** Error code. */
+ private final int code;
+
+ /**
+ * Constructor for specific error type.
+ *
+ * @param code Code.
+ * @param objName Object name.
+ */
+ public SchemaOperationException(int code, String objName) {
+ super(message(code, objName));
+
+ this.code = code;
+ }
+
+ /**
+ * Constructor for generic error.
+ *
+ * @param msg Message.
+ */
+ public SchemaOperationException(String msg) {
+ this(msg, null);
+ }
+
+ /**
+ * Constructor for generic error.
+ *
+ * @param msg Message.
+ * @param cause Cause.
+ */
+ public SchemaOperationException(String msg, Throwable cause) {
+ super(msg, cause);
+
+ code = CODE_GENERIC;
+ }
+
+ /**
+ * @return Code.
+ */
+ public int code() {
+ return code;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaOperationException.class, this, "msg", getMessage());
+ }
+
+ /**
+ * Create message for specific code and object name.
+ *
+ * @param code Code.
+ * @param objName Object name.
+ * @return Message.
+ */
+ private static String message(int code, String objName) {
+ switch (code) {
+ case CODE_CACHE_NOT_FOUND:
+ return "Cache doesn't exist: " + objName;
+
+ case CODE_TABLE_NOT_FOUND:
+ return "Table doesn't exist: " + objName;
+
+ case CODE_TABLE_EXISTS:
+ return "Table already exists: " + objName;
+
+ case CODE_COLUMN_NOT_FOUND:
+ return "Column doesn't exist: " + objName;
+
+ case CODE_COLUMN_EXISTS:
+ return "Column already exists: " + objName;
+
+ case CODE_INDEX_NOT_FOUND:
+ return "Index doesn't exist: " + objName;
+
+ case CODE_INDEX_EXISTS:
+ return "Index already exists: " + objName;
+
+ default:
+ assert false;
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
new file mode 100644
index 0000000..eb0f3cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Schema operation manager.
+ */
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public class SchemaOperationManager {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Query processor. */
+ private final GridQueryProcessor qryProc;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Operation handler. */
+ private final SchemaOperationWorker worker;
+
+ /** Mutex for concurrency control. */
+ private final Object mux = new Object();
+
+ /** Participants. */
+ private Collection<UUID> nodeIds;
+
+ /** Node results. */
+ private Map<UUID, SchemaOperationException> nodeRess;
+
+ /** Current coordinator node. */
+ private ClusterNode crd;
+
+ /** Whether coordinator state is mapped. */
+ private boolean crdMapped;
+
+ /** Coordinator finished flag. */
+ private boolean crdFinished;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param qryProc Query processor.
+ * @param worker Operation handler.
+ * @param crd Coordinator node.
+ */
+ public SchemaOperationManager(GridKernalContext ctx, GridQueryProcessor qryProc, SchemaOperationWorker worker,
+ @Nullable ClusterNode crd) {
+ assert !ctx.clientNode() || crd == null;
+
+ this.ctx = ctx;
+
+ log = ctx.log(SchemaOperationManager.class);
+
+ this.qryProc = qryProc;
+ this.worker = worker;
+
+ synchronized (mux) {
+ this.crd = crd;
+
+ prepareCoordinator();
+ }
+ }
+
+ /**
+ * Map operation handling.
+ */
+ @SuppressWarnings("unchecked")
+ public void start() {
+ worker.start();
+
+ synchronized (mux) {
+ worker.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ onLocalNodeFinished(fut);
+ }
+ });
+ }
+ }
+
+ /**
+ * Handle local node finish.
+ *
+ * @param fut Future.
+ */
+ private void onLocalNodeFinished(IgniteInternalFuture fut) {
+ assert fut.isDone();
+
+ if (ctx.clientNode())
+ return;
+
+ SchemaOperationException err;
+
+ try {
+ fut.get();
+
+ err = null;
+ }
+ catch (Exception e) {
+ err = QueryUtils.wrapIfNeeded(e);
+ }
+
+ synchronized (mux) {
+ if (isLocalCoordinator())
+ onNodeFinished(ctx.localNodeId(), err);
+ else
+ qryProc.sendStatusMessage(crd.id(), operationId(), err);
+ }
+ }
+
+ /**
+ * Handle node finish.
+ *
+ * @param nodeId Node ID.
+ * @param err Error.
+ */
+ public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err) {
+ synchronized (mux) {
+ assert isLocalCoordinator();
+
+ if (nodeRess.containsKey(nodeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Received duplicate result [opId=" + operationId() + ", nodeId=" + nodeId +
+ ", err=" + err + ']');
+
+ return;
+ }
+
+ if (nodeIds.contains(nodeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
+
+ nodeRess.put(nodeId, err);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Received result from non-tracked node (joined after operation started, will ignore) " +
+ "[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
+ }
+
+ checkFinished();
+ }
+ }
+
+ /**
+ * Handle node leave event.
+ *
+ * @param nodeId ID of the node that has left the grid.
+ * @param curCrd Current coordinator node.
+ */
+ public void onNodeLeave(UUID nodeId, ClusterNode curCrd) {
+ synchronized (mux) {
+ assert crd != null;
+
+ if (F.eq(nodeId, crd.id())) {
+ // Coordinator has left!
+ crd = curCrd;
+
+ prepareCoordinator();
+ }
+ else if (isLocalCoordinator()) {
+ // Other node has left, remove it from the coordinator's wait set.
+ // Handle this as success.
+ if (nodeIds.remove(nodeId))
+ nodeRess.remove(nodeId);
+ }
+
+ IgniteInternalFuture fut = worker().future();
+
+ if (fut.isDone())
+ onLocalNodeFinished(fut);
+
+ checkFinished();
+ }
+ }
+
+ /**
+ * Check if operation finished.
+ */
+ private void checkFinished() {
+ assert Thread.holdsLock(mux);
+
+ if (isLocalCoordinator()) {
+ if (crdFinished)
+ return;
+
+ if (nodeIds.size() == nodeRess.size()) {
+ // Initiate finish request.
+ SchemaOperationException err = null;
+
+ for (Map.Entry<UUID, SchemaOperationException> nodeRes : nodeRess.entrySet()) {
+ if (nodeRes.getValue() != null) {
+ err = nodeRes.getValue();
+
+ break;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Collected all results, about to send finish message [opId=" + operationId() +
+ ", err=" + err + ']');
+
+ crdFinished = true;
+
+ qryProc.onCoordinatorFinished(worker.operation(), err);
+ }
+ }
+ }
+
+ /**
+ * Prepare topology state in case local node is coordinator.
+ *
+ * @return {@code True} if state was changed by this call.
+ */
+ private boolean prepareCoordinator() {
+ if (isLocalCoordinator() && !crdMapped) {
+ // Initialize local structures.
+ nodeIds = new HashSet<>();
+ nodeRess = new HashMap<>();
+
+ for (ClusterNode alive : ctx.discovery().aliveServerNodes())
+ nodeIds.add(alive.id());
+
+ if (log.isDebugEnabled())
+ log.debug("Mapped participating nodes on coordinator [opId=" + operationId() +
+ ", crdNodeId=" + ctx.localNodeId() + ", nodes=" + nodeIds + ']');
+
+ crdMapped = true;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Check if current node is local coordinator.
+ *
+ * @return {@code True} if coordinator.
+ */
+ private boolean isLocalCoordinator() {
+ assert Thread.holdsLock(mux);
+
+ return crd != null && crd.isLocal();
+ }
+
+ /**
+ * @return Worker.
+ */
+ public SchemaOperationWorker worker() {
+ return worker;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ private UUID operationId() {
+ return worker.operation().id();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
new file mode 100644
index 0000000..06feecb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Schema operation executor.
+ */
+public class SchemaOperationWorker extends GridWorker {
+ /** Query processor */
+ private final GridQueryProcessor qryProc;
+
+ /** Deployment ID. */
+ private final IgniteUuid depId;
+
+ /** Target operation. */
+ private final SchemaAbstractOperation op;
+
+ /** No-op flag. */
+ private final boolean nop;
+
+ /** Whether cache started. */
+ private final boolean cacheRegistered;
+
+ /** Type descriptor. */
+ private final QueryTypeDescriptorImpl type;
+
+ /** Operation future. */
+ private final GridFutureAdapter fut;
+
+ /** Public operation future. */
+ private final GridFutureAdapter pubFut;
+
+ /** Start guard. */
+ private final AtomicBoolean startGuard = new AtomicBoolean();
+
+ /** Cancellation token. */
+ private final SchemaIndexOperationCancellationToken cancelToken = new SchemaIndexOperationCancellationToken();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param qryProc Query processor.
+ * @param depId Deployment ID.
+ * @param op Target operation.
+ * @param nop No-op flag.
+ * @param err Predefined error.
+ * @param cacheRegistered Whether cache is registered in indexing at this point.
+ * @param type Type descriptor (if available).
+ */
+ public SchemaOperationWorker(GridKernalContext ctx, GridQueryProcessor qryProc, IgniteUuid depId,
+ SchemaAbstractOperation op, boolean nop, @Nullable SchemaOperationException err, boolean cacheRegistered,
+ @Nullable QueryTypeDescriptorImpl type) {
+ super(ctx.igniteInstanceName(), workerName(op), ctx.log(SchemaOperationWorker.class));
+
+ this.qryProc = qryProc;
+ this.depId = depId;
+ this.op = op;
+ this.nop = nop;
+ this.cacheRegistered = cacheRegistered;
+ this.type = type;
+
+ fut = new GridFutureAdapter();
+
+ if (err != null)
+ fut.onDone(err);
+ else if (nop || !cacheRegistered)
+ fut.onDone();
+
+ pubFut = publicFuture(fut);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ try {
+ // Execute.
+ qryProc.processIndexOperationLocal(op, type, depId, cancelToken);
+
+ fut.onDone();
+ }
+ catch (Exception e) {
+ fut.onDone(QueryUtils.wrapIfNeeded(e));
+ }
+ }
+
+ /**
+ * Perform initialization routine.
+ *
+ * @return This instance.
+ */
+ public SchemaOperationWorker start() {
+ if (startGuard.compareAndSet(false, true)) {
+ if (!fut.isDone())
+ new IgniteThread(this).start();
+ }
+
+ return this;
+ }
+
+ /**
+ * Chain the future making sure that operation is completed after local schema is updated.
+ *
+ * @param fut Current future.
+ * @return Chained future.
+ */
+ @SuppressWarnings("unchecked")
+ private GridFutureAdapter<?> publicFuture(GridFutureAdapter fut) {
+ final GridFutureAdapter<?> chainedFut = new GridFutureAdapter<>();
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ Exception err = null;
+
+ try {
+ fut.get();
+
+ if (cacheRegistered && !nop)
+ qryProc.onLocalOperationFinished(op, type);
+ }
+ catch (Exception e) {
+ err = e;
+ }
+ finally {
+ chainedFut.onDone(null, err);
+ }
+ }
+ });
+
+ return chainedFut;
+ }
+
+ /**
+ * @return No-op flag.
+ */
+ public boolean nop() {
+ return nop;
+ }
+
+ /**
+ * @return Whether cache is registered.
+ */
+ public boolean cacheRegistered() {
+ return cacheRegistered;
+ }
+
+ /**
+ * Cancel operation.
+ */
+ public void cancel() {
+ if (cancelToken.cancel())
+ super.cancel();
+ }
+
+ /**
+ * @return Operation.
+ */
+ public SchemaAbstractOperation operation() {
+ return op;
+ }
+
+ /**
+ * @return Future completed when operation is ready.
+ */
+ public IgniteInternalFuture future() {
+ return pubFut;
+ }
+
+ /**
+ * @return Worker name.
+ */
+ private static String workerName(SchemaAbstractOperation op) {
+ return "schema-op-worker-" + op.id();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
new file mode 100644
index 0000000..9fdc6c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Abstract discovery message for schema operations.
+ */
+public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** ID */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Operation. */
+ @GridToStringInclude
+ protected final SchemaAbstractOperation op;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
+ this.op = op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /**
+ * @return Operation.
+ */
+ public SchemaAbstractOperation operation() {
+ return op;
+ }
+
+ /**
+ * @return Whether request must be propagated to exchange thread.
+ */
+ public abstract boolean exchange();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaAbstractDiscoveryMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
new file mode 100644
index 0000000..2245b24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Schema change finish discovery message.
+ */
+public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Error. */
+ private final SchemaOperationException err;
+
+ /** Original propose message. */
+ private transient SchemaProposeDiscoveryMessage proposeMsg;
+
+ /**
+ * Constructor.
+ *
+ * @param op Original operation.
+ * @param err Error.
+ */
+ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err) {
+ super(op);
+
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exchange() {
+ return false;
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return err != null;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public SchemaOperationException error() {
+ return err;
+ }
+
+ /**
+ * @return Propose message.
+ */
+ public SchemaProposeDiscoveryMessage proposeMessage() {
+ return proposeMsg;
+ }
+
+ /**
+ * @param proposeMsg Propose message.
+ */
+ public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
+ this.proposeMsg = proposeMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
new file mode 100644
index 0000000..5f75e60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Schema operation status message.
+ */
+public class SchemaOperationStatusMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Operation ID. */
+ private UUID opId;
+
+ /** Error bytes (if any). */
+ private byte[] errBytes;
+
+ /** Sender node ID. */
+ @GridDirectTransient
+ private UUID sndNodeId;
+
+ /**
+ * Default constructor.
+ */
+ public SchemaOperationStatusMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ * @param errBytes Error bytes.
+ */
+ public SchemaOperationStatusMessage(UUID opId, byte[] errBytes) {
+ this.opId = opId;
+ this.errBytes = errBytes;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ public UUID operationId() {
+ return opId;
+ }
+
+ /**
+ * @return Error bytes.
+ */
+ @Nullable public byte[] errorBytes() {
+ return errBytes;
+ }
+
+ /**
+ * @return Sender node ID.
+ */
+ public UUID senderNodeId() {
+ return sndNodeId;
+ }
+
+ /**
+ * @param sndNodeId Sender node ID.
+ */
+ public void senderNodeId(UUID sndNodeId) {
+ this.sndNodeId = sndNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeUuid("opId", opId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ opId = reader.readUuid("opId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(SchemaOperationStatusMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -53;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaOperationStatusMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
new file mode 100644
index 0000000..664ee03
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.SchemaKey;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Schema change propose discovery message.
+ */
+public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache deployment ID. */
+ private IgniteUuid depId;
+
+ /** Error. */
+ private SchemaOperationException err;
+
+ /** Whether to perform exchange. */
+ private transient boolean exchange;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ public SchemaProposeDiscoveryMessage(SchemaAbstractOperation op) {
+ super(op);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exchange() {
+ return exchange;
+ }
+
+ /**
+ * @param exchange Whether to perform exchange.
+ */
+ public void exchange(boolean exchange) {
+ this.exchange = exchange;
+ }
+
+ /**
+ * @return Deployment ID.
+ */
+ @Nullable public IgniteUuid deploymentId() {
+ return depId;
+ }
+
+ /**
+ * @param depId Deployment ID.
+ */
+ public void deploymentId(IgniteUuid depId) {
+ this.depId = depId;
+ }
+
+ /**
+ *
+ * @return {@code True} if message is initialized.
+ */
+ public boolean initialized() {
+ return deploymentId() != null || hasError();
+ }
+
+ /**
+ * Set error.
+ *
+ * @param err Error.
+ */
+ public void onError(SchemaOperationException err) {
+ if (!hasError()) {
+ this.err = err;
+ }
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return err != null;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public SchemaOperationException error() {
+ return err;
+ }
+
+ /**
+ * @return Schema key.
+ */
+ public SchemaKey schemaKey() {
+ return new SchemaKey(operation().space(), depId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
new file mode 100644
index 0000000..8418ece
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Abstract operation on schema.
+ */
+public abstract class SchemaAbstractOperation implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Operation ID. */
+ private final UUID opId;
+
+ /** Space. */
+ private final String space;
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ * @param space Space.
+ */
+ public SchemaAbstractOperation(UUID opId, String space) {
+ this.opId = opId;
+ this.space = space;
+ }
+
+ /**
+ * @return Operation id.
+ */
+ public UUID id() {
+ return opId;
+ }
+
+ /**
+ * @return Space.
+ */
+ public String space() {
+ return space;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaAbstractOperation.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
new file mode 100644
index 0000000..fc4a9ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import java.util.UUID;
+
+/**
+ * Schema index abstract operation.
+ */
+public abstract class SchemaIndexAbstractOperation extends SchemaAbstractOperation {
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ * @param space Space.
+ */
+ public SchemaIndexAbstractOperation(UUID opId, String space) {
+ super(opId, space);
+ }
+
+ /**
+ * @return Index name.
+ */
+ public abstract String indexName();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
new file mode 100644
index 0000000..9281f2a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Schema index create operation.
+ */
+public class SchemaIndexCreateOperation extends SchemaIndexAbstractOperation {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Table name. */
+ private final String tblName;
+
+ /** Index. */
+ @GridToStringInclude
+ private final QueryIndex idx;
+
+ /** Ignore operation if index exists. */
+ private final boolean ifNotExists;
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation id.
+ * @param space Space.
+ * @param tblName Table name.
+ * @param idx Index params.
+ * @param ifNotExists Ignore operation if index exists.
+ */
+ public SchemaIndexCreateOperation(UUID opId, String space, String tblName, QueryIndex idx, boolean ifNotExists) {
+ super(opId, space);
+
+ this.tblName = tblName;
+ this.idx = idx;
+ this.ifNotExists = ifNotExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String indexName() {
+ return QueryUtils.indexName(tblName, idx);
+ }
+
+ /**
+ * @return Table name.
+ */
+ public String tableName() {
+ return tblName;
+ }
+
+ /**
+ * @return Index params.
+ */
+ public QueryIndex index() {
+ return idx;
+ }
+
+ /**
+ * @return Ignore operation if index exists.
+ */
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaIndexCreateOperation.class, this, "parent", super.toString());
+ }
+}