You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/20 07:49:08 UTC
[35/70] [abbrv] ignite git commit: IGNITE-4565: Implemented CREATE
INDEX and DROP INDEX. This closes #1773. This closes #1804.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 2b957be..ceb139a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,34 +17,29 @@
package org.apache.ignite.internal.processors.query;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -54,7 +49,23 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
+import org.apache.ignite.internal.processors.query.schema.SchemaKey;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationManager;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -63,17 +74,45 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
+
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
import static org.apache.ignite.internal.IgniteComponentType.INDEXING;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL;
/**
* Indexing processor.
@@ -82,26 +121,70 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Queries detail metrics eviction frequency. */
private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000;
+ /** */
+ private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
+
/** For tests. */
public static Class<? extends GridQueryIndexing> idxCls;
+ /** JDK marshaller to serialize errors. */
+ private final JdkMarshaller marsh = new JdkMarshaller();
+
/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** */
+ private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
+
/** Type descriptors. */
- private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new ConcurrentHashMap8<>();
+ private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new ConcurrentHashMap<>();
/** Type descriptors. */
- private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> typesByName = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> typesByName = new ConcurrentHashMap<>();
/** */
private final GridQueryIndexing idx;
- /** */
- private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
+ /** All indexes. */
+ private final ConcurrentMap<QueryIndexKey, QueryIndexDescriptorImpl> idxs = new ConcurrentHashMap<>();
- /** */
- private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
+ /** Schema operation futures created on client side. */
+ private final ConcurrentMap<UUID, SchemaOperationClientFuture> schemaCliFuts = new ConcurrentHashMap<>();
+
+ /** IO message listener. */
+ private final GridMessageListener ioLsnr;
+
+ /** Schema operations. */
+ private final ConcurrentHashMap<SchemaKey, SchemaOperation> schemaOps = new ConcurrentHashMap<>();
+
+ /** Active propose messages. */
+ private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>();
+
+ /** General state mutex. */
+ private final Object stateMux = new Object();
+
+ /** Coordinator node (initialized lazily). */
+ private ClusterNode crd;
+
+ /** Registered spaces. */
+ private final Collection<String> spaces = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+ /** ID history for index create/drop discovery messages. */
+ private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist =
+ new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
+ /** History of already completed operations. */
+ private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds =
+ new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
+ /** Pending status messages. */
+ private final LinkedList<SchemaOperationStatusMessage> pendingMsgs = new LinkedList<>();
+
+ /** Disconnected flag. */
+ private boolean disconnected;
+
+ /** Whether exchange thread is ready to process further requests. */
+ private boolean exchangeReady;
/** */
private boolean skipFieldLookup;
@@ -119,6 +202,20 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
else
idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null;
+
+ ioLsnr = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ if (msg instanceof SchemaOperationStatusMessage) {
+ SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
+
+ msg0.senderNodeId(nodeId);
+
+ processStatusMessage(msg0);
+ }
+ else
+ U.warn(log, "Unsupported IO message: " + msg);
+ }
+ };
}
/** {@inheritDoc} */
@@ -131,6 +228,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idx.start(ctx, busyLock);
}
+ ctx.io().addMessageListener(TOPIC_SCHEMA, ioLsnr);
+
// Schedule queries detail metrics eviction.
qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
@Override public void run() {
@@ -140,6 +239,401 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ);
}
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (cancel && idx != null) {
+ try {
+ while (!busyLock.tryBlock(500))
+ idx.cancelAllQueries();
+
+ return;
+ } catch (InterruptedException ignored) {
+ U.warn(log, "Interrupted while waiting for active queries cancellation.");
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ busyLock.block();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ ctx.io().removeMessageListener(TOPIC_SCHEMA, ioLsnr);
+
+ if (idx != null)
+ idx.stop();
+
+ U.closeQuiet(qryDetailMetricsEvictTask);
+ }
+
+ /**
+ * Handle cache kernal start. At this point discovery and IO managers are operational, caches are not started yet.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onCacheKernalStart() throws IgniteCheckedException {
+ synchronized (stateMux) {
+ exchangeReady = true;
+
+ // Re-run pending top-level proposals.
+ for (SchemaOperation schemaOp : schemaOps.values())
+ onSchemaPropose(schemaOp.proposeMessage());
+ }
+ }
+
+ /**
+ * Handle cache reconnect.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onCacheReconnect() throws IgniteCheckedException {
+ synchronized (stateMux) {
+ assert disconnected;
+
+ disconnected = false;
+
+ onCacheKernalStart();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return DiscoveryDataExchangeType.QUERY_PROC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ // Collect active proposals.
+ synchronized (stateMux) {
+ LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data = new LinkedHashMap<>(activeProposals);
+
+ dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), data);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ synchronized (stateMux) {
+ // Preserve proposals.
+ LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data0 =
+ (LinkedHashMap<UUID, SchemaProposeDiscoveryMessage>)data.commonData();
+
+ // Process proposals as if they were received as regular discovery messages.
+ if (data0 != null) {
+ for (SchemaProposeDiscoveryMessage activeProposal : data0.values())
+ onSchemaProposeDiscovery0(activeProposal);
+ }
+ }
+ }
+
+ /**
+ * Process schema propose message from discovery thread.
+ *
+ * @param msg Message.
+ * @return {@code True} if exchange should be triggered.
+ */
+ private boolean onSchemaProposeDiscovery(SchemaProposeDiscoveryMessage msg) {
+ UUID opId = msg.operation().id();
+ String space = msg.operation().space();
+
+ if (!msg.initialized()) {
+ // Ensure cache exists on coordinator node.
+ DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(space);
+
+ if (cacheDesc == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received schema propose discovery message, but cache doesn't exist " +
+ "(will report error) [opId=" + opId + ", msg=" + msg + ']');
+
+ msg.onError(new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, space));
+ }
+ else {
+ CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
+
+ if (ccfg.getCacheMode() == CacheMode.LOCAL) {
+ // Distributed operation is not allowed on LOCAL caches.
+ if (log.isDebugEnabled())
+ log.debug("Received schema propose discovery message, but cache is LOCAL " +
+ "(will report error) [opId=" + opId + ", msg=" + msg + ']');
+
+ msg.onError(new SchemaOperationException("Schema changes are not supported for LOCAL cache."));
+ }
+ else {
+ // Preserve deployment ID so that we can distinguish between different caches with the same name.
+ if (msg.deploymentId() == null)
+ msg.deploymentId(cacheDesc.deploymentId());
+
+ assert F.eq(cacheDesc.deploymentId(), msg.deploymentId());
+ }
+ }
+ }
+
+ // Complete client future and exit immediately in case of error.
+ if (msg.hasError()) {
+ SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId);
+
+ if (cliFut != null)
+ cliFut.onDone(msg.error());
+
+ return false;
+ }
+
+ return onSchemaProposeDiscovery0(msg);
+ }
+
+ /**
+ * Process schema propose message from discovery thread (or from cache start routine).
+ *
+ * @param msg Message.
+ * @return {@code True} if exchange should be triggered.
+ */
+ private boolean onSchemaProposeDiscovery0(SchemaProposeDiscoveryMessage msg) {
+ UUID opId = msg.operation().id();
+
+ synchronized (stateMux) {
+ if (disconnected) {
+ if (log.isDebugEnabled())
+ log.debug("Processing discovery schema propose message, but node is disconnected (will ignore) " +
+ "[opId=" + opId + ", msg=" + msg + ']');
+
+ return false;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Processing discovery schema propose message [opId=" + opId + ", msg=" + msg + ']');
+
+ // Put message to active operations set.
+ SchemaProposeDiscoveryMessage oldDesc = activeProposals.put(msg.operation().id(), msg);
+
+ assert oldDesc == null;
+
+ // Create schema operation and either trigger it immediately from exchange thread or append to already
+ // running operation.
+ SchemaOperation schemaOp = new SchemaOperation(msg);
+
+ SchemaKey key = msg.schemaKey();
+
+ SchemaOperation prevSchemaOp = schemaOps.get(key);
+
+ if (prevSchemaOp != null) {
+ prevSchemaOp = prevSchemaOp.unwind();
+
+ if (log.isDebugEnabled())
+ log.debug("Schema change is enqueued and will be executed after previous operation is completed " +
+ "[opId=" + opId + ", prevOpId=" + prevSchemaOp.id() + ']');
+
+ prevSchemaOp.next(schemaOp);
+
+ return false;
+ }
+ else {
+ schemaOps.put(key, schemaOp);
+
+ return exchangeReady;
+ }
+ }
+ }
+
+ /**
+ * Handle schema propose from exchange thread.
+ *
+ * @param msg Discovery message.
+ */
+ @SuppressWarnings("ThrowableInstanceNeverThrown")
+ public void onSchemaPropose(SchemaProposeDiscoveryMessage msg) {
+ UUID opId = msg.operation().id();
+
+ if (log.isDebugEnabled())
+ log.debug("Processing schema propose message (exchange) [opId=" + opId + ']');
+
+ synchronized (stateMux) {
+ if (disconnected)
+ return;
+
+ SchemaOperation curOp = schemaOps.get(msg.schemaKey());
+
+ assert curOp != null;
+ assert F.eq(opId, curOp.id());
+ assert !curOp.started();
+
+ startSchemaChange(curOp);
+ }
+ }
+
+ /**
+ * Process schema finish message from discovery thread.
+ *
+ * @param msg Message.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
+ UUID opId = msg.operation().id();
+
+ if (log.isDebugEnabled())
+ log.debug("Received schema finish message (discovery) [opId=" + opId + ", msg=" + msg + ']');
+
+ synchronized (stateMux) {
+ if (disconnected)
+ return;
+
+ boolean completedOpAdded = completedOpIds.add(opId);
+
+ assert completedOpAdded;
+
+ // Remove propose message so that it will not be shared with joining nodes.
+ SchemaProposeDiscoveryMessage proposeMsg = activeProposals.remove(opId);
+
+ assert proposeMsg != null;
+
+ // Apply changes to public cache schema if operation is successful and original cache is still there.
+ if (!msg.hasError()) {
+ DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().space());
+
+ if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId()))
+ cacheDesc.schemaChangeFinish(msg);
+ }
+
+ // Propose message will be used from exchange thread to
+ msg.proposeMessage(proposeMsg);
+
+ if (exchangeReady) {
+ SchemaOperation op = schemaOps.get(proposeMsg.schemaKey());
+
+ if (F.eq(op.id(), opId)) {
+ // Completed top operation.
+ op.finishMessage(msg);
+
+ if (op.started())
+ op.doFinish();
+ }
+ else {
+ // Completed operation in the middle, will schedule completion later.
+ while (op != null) {
+ if (F.eq(op.id(), opId))
+ break;
+
+ op = op.next();
+ }
+
+ assert op != null;
+ assert !op.started();
+
+ op.finishMessage(msg);
+ }
+ }
+ else {
+ // Set next operation as top-level one.
+ SchemaKey schemaKey = proposeMsg.schemaKey();
+
+ SchemaOperation op = schemaOps.remove(schemaKey);
+
+ assert op != null;
+ assert F.eq(op.id(), opId);
+
+ // Chain to the next operation (if any).
+ SchemaOperation nextOp = op.next();
+
+ if (nextOp != null)
+ schemaOps.put(schemaKey, nextOp);
+ }
+
+ // Clean stale IO messages from just-joined nodes.
+ cleanStaleStatusMessages(opId);
+ }
+
+ // Complete client future (if any).
+ SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId);
+
+ if (cliFut != null) {
+ if (msg.hasError())
+ cliFut.onDone(msg.error());
+ else
+ cliFut.onDone();
+ }
+ }
+
+ /**
+ * Initiate actual schema change operation.
+ *
+ * @param schemaOp Schema operation.
+ */
+ @SuppressWarnings({"unchecked", "ThrowableInstanceNeverThrown"})
+ private void startSchemaChange(SchemaOperation schemaOp) {
+ assert Thread.holdsLock(stateMux);
+ assert !schemaOp.started();
+
+ // Get current cache state.
+ SchemaProposeDiscoveryMessage msg = schemaOp.proposeMessage();
+
+ String space = msg.operation().space();
+
+ DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(space);
+
+ boolean cacheExists = cacheDesc != null && F.eq(msg.deploymentId(), cacheDesc.deploymentId());
+
+ boolean cacheRegistered = cacheExists && spaces.contains(CU.mask(space));
+
+ // Validate schema state and decide whether we should proceed or not.
+ SchemaAbstractOperation op = msg.operation();
+
+ QueryTypeDescriptorImpl type = null;
+ SchemaOperationException err;
+
+ boolean nop = false;
+
+ if (cacheExists) {
+ if (cacheRegistered) {
+ // If cache is started, we perform validation against real schema.
+ T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> res = prepareChangeOnStartedCache(op);
+
+ assert res.get2() != null;
+
+ type = res.get1();
+ nop = res.get2();
+ err = res.get3();
+ }
+ else {
+ // If cache is not started yet, there is no schema. Take schema from cache descriptor and validate.
+ QuerySchema schema = cacheDesc.schema();
+
+ T2<Boolean, SchemaOperationException> res = prepareChangeOnNotStartedCache(op, schema);
+
+ assert res.get1() != null;
+
+ type = null;
+ nop = res.get1();
+ err = res.get2();
+ }
+ }
+ else
+ err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, op.space());
+
+ // Start operation.
+ SchemaOperationWorker worker =
+ new SchemaOperationWorker(ctx, this, msg.deploymentId(), op, nop, err, cacheRegistered, type);
+
+ SchemaOperationManager mgr = new SchemaOperationManager(ctx, this, worker,
+ ctx.clientNode() ? null : coordinator());
+
+ schemaOp.manager(mgr);
+
+ mgr.start();
+
+ // Unwind pending IO messages.
+ if (!ctx.clientNode() && coordinator().isLocal())
+ unwindPendingMessages(schemaOp.id(), mgr);
+
+ // Schedule operation finish handling if needed.
+ if (schemaOp.hasFinishMessage())
+ schemaOp.doFinish();
+ }
+
/**
* @return {@code true} If indexing module is in classpath and successfully initialized.
*/
@@ -148,55 +642,115 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @return Indexing.
+ * @throws IgniteException If module is not enabled.
+ */
+ public GridQueryIndexing getIndexing() throws IgniteException {
+ checkxEnabled();
+
+ return idx;
+ }
+
+ /**
* @param cctx Cache context.
+ * @param schema Initial schema.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("deprecation")
- private void initializeCache(GridCacheContext<?, ?> cctx) throws IgniteCheckedException {
+ @SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"})
+ private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema schema) throws IgniteCheckedException {
String space = cctx.name();
- CacheConfiguration<?,?> ccfg = cctx.config();
-
// Prepare candidates.
List<Class<?>> mustDeserializeClss = new ArrayList<>();
Collection<QueryTypeCandidate> cands = new ArrayList<>();
- if (!F.isEmpty(ccfg.getQueryEntities())) {
- for (QueryEntity qryEntity : ccfg.getQueryEntities()) {
+ Collection<QueryEntity> qryEntities = schema.entities();
+
+ if (!F.isEmpty(qryEntities)) {
+ for (QueryEntity qryEntity : qryEntities) {
QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity, mustDeserializeClss);
cands.add(cand);
}
}
- // Register candidates.
- idx.registerCache(space, cctx, cctx.config());
+ // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations.
+ Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
+ Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
- try {
- for (QueryTypeCandidate cand : cands) {
- QueryTypeIdKey typeId = cand.typeId();
- QueryTypeIdKey altTypeId = cand.alternativeTypeId();
- QueryTypeDescriptorImpl desc = cand.descriptor();
+ for (QueryTypeCandidate cand : cands) {
+ QueryTypeDescriptorImpl desc = cand.descriptor();
- if (typesByName.putIfAbsent(new QueryTypeNameKey(space, desc.name()), desc) != null)
- throw new IgniteCheckedException("Type with name '" + desc.name() + "' already indexed " +
- "in cache '" + space + "'.");
+ QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
- types.put(typeId, desc);
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate table name [tblName=" + desc.tableName() +
+ ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
- if (altTypeId != null)
- types.put(altTypeId, desc);
+ for (String idxName : desc.indexes().keySet()) {
+ oldDesc = idxTypMap.put(idxName, desc);
- idx.registerType(space, desc);
+ if (oldDesc != null)
+ throw new IgniteException("Duplicate index name [idxName=" + idxName +
+ ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']');
}
}
- catch (IgniteCheckedException | RuntimeException e) {
- unregisterCache0(space);
- throw e;
+ // Apply pending operation which could have been completed as no-op at this point. There could be only one
+ // in-flight operation for a cache.
+ synchronized (stateMux) {
+ if (disconnected)
+ return;
+
+ for (SchemaOperation op : schemaOps.values()) {
+ if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) {
+ if (op.started()) {
+ SchemaOperationWorker worker = op.manager().worker();
+
+ assert !worker.cacheRegistered();
+
+ if (!worker.nop()) {
+ IgniteInternalFuture fut = worker.future();
+
+ assert fut.isDone();
+
+ if (fut.error() == null) {
+ SchemaAbstractOperation op0 = op.proposeMessage().operation();
+
+ if (op0 instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0;
+
+ QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName());
+
+ assert typeDesc != null;
+
+ QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(),
+ typeDesc);
+ }
+ else if (op0 instanceof SchemaIndexDropOperation) {
+ SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0;
+
+ QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName());
+
+ assert typeDesc != null;
+
+ QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc);
+ }
+ else
+ assert false;
+ }
+ }
+ }
+
+ break;
+ }
+ }
}
+ // Ready to register at this point.
+ registerCache0(space, cctx, cands);
+
// Warn about possible implicit deserialization.
if (!mustDeserializeClss.isEmpty()) {
U.warn(log, "Some classes in query configuration cannot be written in binary format " +
@@ -209,46 +763,41 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (cancel && idx != null)
- try {
- while (!busyLock.tryBlock(500))
- idx.cancelAllQueries();
-
- return;
- }
- catch (InterruptedException ignored) {
- U.warn(log, "Interrupted while waiting for active queries cancellation.");
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ Collection<SchemaOperationClientFuture> futs;
- Thread.currentThread().interrupt();
- }
+ synchronized (stateMux) {
+ disconnected = true;
+ exchangeReady = false;
- busyLock.block();
- }
+ // Clear client futures.
+ futs = new ArrayList<>(schemaCliFuts.values());
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
+ schemaCliFuts.clear();
- if (idx != null)
- idx.stop();
+ // Clear operations data.
+ activeProposals.clear();
+ schemaOps.clear();
+ }
- U.closeQuiet(qryDetailMetricsEvictTask);
- }
+ // Complete client futures outside of synchonized block because they may have listeners/chains.
+ for (SchemaOperationClientFuture fut : futs)
+ fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown)."));
- /** {@inheritDoc} */
- @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
if (idx != null)
idx.onDisconnected(reconnectFut);
}
/**
+ * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker.
+ * When called for the first time, we initialize topology thus understanding whether current node is coordinator
+ * or not.
+ *
* @param cctx Cache context.
+ * @param schema Index states.
* @throws IgniteCheckedException If failed.
*/
- public void onCacheStart(GridCacheContext cctx) throws IgniteCheckedException {
+ public void onCacheStart(GridCacheContext cctx, QuerySchema schema) throws IgniteCheckedException {
if (idx == null)
return;
@@ -258,45 +807,506 @@ public class GridQueryProcessor extends GridProcessorAdapter {
cctx.shared().database().checkpointReadLock();
try {
- initializeCache(cctx);
+ initializeCache(cctx, schema);
}
finally {
cctx.shared().database().checkpointReadUnlock();
- busyLock.leaveBusy();
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param cctx Cache context.
+ */
+ public void onCacheStop(GridCacheContext cctx) {
+ if (idx == null)
+ return;
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ unregisterCache0(cctx.name());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @return Skip field lookup flag.
+ */
+ public boolean skipFieldLookup() {
+ return skipFieldLookup;
+ }
+
+ /**
+ * @param skipFieldLookup Skip field lookup flag.
+ */
+ public void skipFieldLookup(boolean skipFieldLookup) {
+ this.skipFieldLookup = skipFieldLookup;
+ }
+
+ /**
+ * Handle custom discovery message.
+ *
+ * @param msg Message.
+ */
+ public void onDiscovery(SchemaAbstractDiscoveryMessage msg) {
+ IgniteUuid id = msg.id();
+
+ if (!dscoMsgIdHist.add(id)) {
+ U.warn(log, "Received duplicate schema custom discovery message (will ignore) [opId=" +
+ msg.operation().id() + ", msg=" + msg +']');
+
+ return;
+ }
+
+ if (msg instanceof SchemaProposeDiscoveryMessage) {
+ SchemaProposeDiscoveryMessage msg0 = (SchemaProposeDiscoveryMessage)msg;
+
+ boolean exchange = onSchemaProposeDiscovery(msg0);
+
+ msg0.exchange(exchange);
+ }
+ else if (msg instanceof SchemaFinishDiscoveryMessage) {
+ SchemaFinishDiscoveryMessage msg0 = (SchemaFinishDiscoveryMessage)msg;
+
+ onSchemaFinishDiscovery(msg0);
+ }
+ else
+ U.warn(log, "Received unsupported schema custom discovery message (will ignore) [opId=" +
+ msg.operation().id() + ", msg=" + msg +']');
+ }
+
+ /**
+ * Prepare change on started cache.
+ *
+ * @param op Operation.
+ * @return Result: affected type, nop flag, error.
+ */
+ private T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> prepareChangeOnStartedCache(
+ SchemaAbstractOperation op) {
+ QueryTypeDescriptorImpl type = null;
+ boolean nop = false;
+ SchemaOperationException err = null;
+
+ String space = op.space();
+
+ if (op instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op;
+
+ QueryIndex idx = op0.index();
+
+ // Make sure table exists.
+ String tblName = op0.tableName();
+
+ type = type(space, tblName);
+
+ if (type == null)
+ err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName);
+ else {
+ // Make sure that index can be applied to the given table.
+ for (String idxField : idx.getFieldNames()) {
+ if (!type.fields().containsKey(idxField)) {
+ err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND,
+ idxField);
+
+ break;
+ }
+ }
+ }
+
+ // Check conflict with other indexes.
+ if (err == null) {
+ String idxName = op0.index().getName();
+
+ QueryIndexKey idxKey = new QueryIndexKey(space, idxName);
+
+ if (idxs.get(idxKey) != null) {
+ if (op0.ifNotExists())
+ nop = true;
+ else
+ err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName);
+ }
+ }
+ }
+ else if (op instanceof SchemaIndexDropOperation) {
+ SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
+
+ String idxName = op0.indexName();
+
+ QueryIndexDescriptorImpl oldIdx = idxs.get(new QueryIndexKey(space, idxName));
+
+ if (oldIdx == null) {
+ if (op0.ifExists())
+ nop = true;
+ else
+ err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName);
+ }
+ else
+ type = oldIdx.typeDescriptor();
+ }
+ else
+ err = new SchemaOperationException("Unsupported operation: " + op);
+
+ return new T3<>(type, nop, err);
+ }
+
+ /**
+ * Prepare operation on non-started cache.
+ *
+ * @param op Operation.
+ * @param schema Known cache schema.
+ * @return Result: nop flag, error.
+ */
+ private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache(SchemaAbstractOperation op,
+ QuerySchema schema) {
+ boolean nop = false;
+ SchemaOperationException err = null;
+
+ // Build table and index maps.
+ Map<String, QueryEntity> tblMap = new HashMap<>();
+ Map<String, T2<QueryEntity, QueryIndex>> idxMap = new HashMap<>();
+
+ for (QueryEntity entity : schema.entities()) {
+ String tblName = QueryUtils.tableName(entity);
+
+ QueryEntity oldEntity = tblMap.put(tblName, entity);
+
+ if (oldEntity != null) {
+ err = new SchemaOperationException("Invalid schema state (duplicate table found): " + tblName);
+
+ break;
+ }
+
+ for (QueryIndex entityIdx : entity.getIndexes()) {
+ String idxName = QueryUtils.indexName(entity, entityIdx);
+
+ T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.put(idxName, new T2<>(entity, entityIdx));
+
+ if (oldIdxEntity != null) {
+ err = new SchemaOperationException("Invalid schema state (duplicate index found): " +
+ idxName);
+
+ break;
+ }
+ }
+
+ if (err != null)
+ break;
+ }
+
+ // Now check whether operation can be applied to schema.
+ if (op instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+
+ String idxName = op0.indexName();
+
+ T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName);
+
+ if (oldIdxEntity == null) {
+ String tblName = op0.tableName();
+
+ QueryEntity oldEntity = tblMap.get(tblName);
+
+ if (oldEntity == null)
+ err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName);
+ else {
+ for (String fieldName : op0.index().getFields().keySet()) {
+ Set<String> oldEntityFields = new HashSet<>(oldEntity.getFields().keySet());
+
+ for (Map.Entry<String, String> alias : oldEntity.getAliases().entrySet()) {
+ oldEntityFields.remove(alias.getKey());
+ oldEntityFields.add(alias.getValue());
+ }
+
+ if (!oldEntityFields.contains(fieldName)) {
+ err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND,
+ fieldName);
+
+ break;
+ }
+ }
+ }
+ }
+ else {
+ if (op0.ifNotExists())
+ nop = true;
+ else
+ err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName);
+ }
+ }
+ else if (op instanceof SchemaIndexDropOperation) {
+ SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
+
+ String idxName = op0.indexName();
+
+ T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName);
+
+ if (oldIdxEntity == null) {
+ if (op0.ifExists())
+ nop = true;
+ else
+ err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName);
+ }
+ }
+ else
+ err = new SchemaOperationException("Unsupported operation: " + op);
+
+ return new T2<>(nop, err);
+ }
+
+ /**
+ * Invoked when coordinator finished ensuring that all participants are ready.
+ *
+ * @param op Operation.
+ * @param err Error (if any).
+ */
+ public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err) {
+ synchronized (stateMux) {
+ SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err);
+
+ try {
+ ctx.discovery().sendCustomEvent(msg);
+ }
+ catch (Exception e) {
+ // Failed to send finish message over discovery. This is something unrecoverable.
+ U.warn(log, "Failed to send schema finish discovery message [opId=" + op.id() + ']', e);
+ }
+ }
+ }
+
+ /**
+ * Get current coordinator node.
+ *
+ * @return Coordinator node.
+ */
+ private ClusterNode coordinator() {
+ assert !ctx.clientNode();
+
+ synchronized (stateMux) {
+ if (crd == null) {
+ ClusterNode crd0 = null;
+
+ for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+ if (crd0 == null || crd0.order() > node.order())
+ crd0 = node;
+ }
+
+ assert crd0 != null;
+
+ crd = crd0;
+ }
+
+ return crd;
+ }
+ }
+
+ /**
+ * Get rid of stale IO message received from other nodes which joined when operation had been in progress.
+ *
+ * @param opId Operation ID.
+ */
+ private void cleanStaleStatusMessages(UUID opId) {
+ Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator();
+
+ while (it.hasNext()) {
+ SchemaOperationStatusMessage statusMsg = it.next();
+
+ if (F.eq(opId, statusMsg.operationId())) {
+ it.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Dropped operation status message because it is already completed [opId=" + opId +
+ ", rmtNode=" + statusMsg.senderNodeId() + ']');
+ }
+ }
+ }
+
+ /**
+ * Apply positive index operation result.
+ *
+ * @param op Operation.
+ * @param type Type descriptor (if available),
+ */
+ public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable QueryTypeDescriptorImpl type) {
+ synchronized (stateMux) {
+ if (disconnected)
+ return;
+
+ // No need to apply anything to obsolete type.
+ if (type == null || type.obsolete()) {
+ if (log.isDebugEnabled())
+ log.debug("Local operation finished, but type descriptor is either missing or obsolete " +
+ "(will ignore) [opId=" + op.id() + ']');
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Local operation finished successfully [opId=" + op.id() + ']');
+
+ try {
+ if (op instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+
+ QueryUtils.processDynamicIndexChange(op0.indexName(), op0.index(), type);
+
+ QueryIndexDescriptorImpl idxDesc = type.index(op0.indexName());
+
+ QueryIndexKey idxKey = new QueryIndexKey(op.space(), op0.indexName());
+
+ idxs.put(idxKey, idxDesc);
+ }
+ else {
+ assert op instanceof SchemaIndexDropOperation;
+
+ SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
+
+ QueryUtils.processDynamicIndexChange(op0.indexName(), null, type);
+
+ QueryIndexKey idxKey = new QueryIndexKey(op.space(), op0.indexName());
+
+ idxs.remove(idxKey);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to finish index operation [opId=" + op.id() + " op=" + op + ']', e);
+ }
+ }
+ }
+
+ /**
+ * Handle node leave.
+ *
+ * @param node Node.
+ */
+ public void onNodeLeave(ClusterNode node) {
+ synchronized (stateMux) {
+ // Clients do not send status messages and are never coordinators.
+ if (ctx.clientNode())
+ return;
+
+ ClusterNode crd0 = coordinator();
+
+ if (F.eq(node.id(), crd0.id())) {
+ crd = null;
+
+ crd0 = coordinator();
+ }
+
+ for (SchemaOperation op : schemaOps.values()) {
+ if (op.started()) {
+ op.manager().onNodeLeave(node.id(), crd0);
+
+ if (crd0.isLocal())
+ unwindPendingMessages(op.id(), op.manager());
+ }
+ }
+ }
+ }
+
+ /**
+ * Process index operation.
+ *
+ * @param op Operation.
+ * @param type Type descriptor.
+ * @param depId Cache deployment ID.
+ * @param cancelTok Cancel token.
+ * @throws SchemaOperationException If failed.
+ */
+ public void processIndexOperationLocal(SchemaAbstractOperation op, QueryTypeDescriptorImpl type, IgniteUuid depId,
+ SchemaIndexOperationCancellationToken cancelTok) throws SchemaOperationException {
+ if (log.isDebugEnabled())
+ log.debug("Started local index operation [opId=" + op.id() + ']');
+
+ String space = op.space();
+
+ GridCacheAdapter cache = ctx.cache().internalCache(op.space());
+
+ if (cache == null || !F.eq(depId, cache.context().dynamicDeploymentId()))
+ throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, op.space());
+
+ try {
+ if (op instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op;
+
+ QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index());
+
+ SchemaIndexCacheVisitor visitor =
+ new SchemaIndexCacheVisitorImpl(this, cache.context(), space, op0.tableName(), cancelTok);
+
+ idx.dynamicIndexCreate(space, op0.tableName(), idxDesc, op0.ifNotExists(), visitor);
+ }
+ else if (op instanceof SchemaIndexDropOperation) {
+ SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
+
+ idx.dynamicIndexDrop(space, op0.indexName(), op0.ifExists());
+ }
+ else
+ throw new SchemaOperationException("Unsupported operation: " + op);
+ }
+ catch (Exception e) {
+ if (e instanceof SchemaOperationException)
+ throw (SchemaOperationException)e;
+ else
+ throw new SchemaOperationException("Schema change operation failed: " + e.getMessage(), e);
}
}
/**
+ * Register cache in indexing SPI.
+ *
+ * @param space Space.
* @param cctx Cache context.
+ * @param cands Candidates.
+ * @throws IgniteCheckedException If failed.
*/
- public void onCacheStop(GridCacheContext cctx) {
- if (idx == null)
- return;
+ private void registerCache0(String space, GridCacheContext<?, ?> cctx, Collection<QueryTypeCandidate> cands)
+ throws IgniteCheckedException {
+ synchronized (stateMux) {
+ idx.registerCache(space, cctx, cctx.config());
- if (!busyLock.enterBusy())
- return;
+ try {
+ for (QueryTypeCandidate cand : cands) {
+ QueryTypeIdKey typeId = cand.typeId();
+ QueryTypeIdKey altTypeId = cand.alternativeTypeId();
+ QueryTypeDescriptorImpl desc = cand.descriptor();
- try {
- unregisterCache0(cctx.name());
- }
- finally {
- busyLock.leaveBusy();
- }
- }
+ if (typesByName.putIfAbsent(new QueryTypeNameKey(space, desc.name()), desc) != null)
+ throw new IgniteCheckedException("Type with name '" + desc.name() + "' already indexed " +
+ "in cache '" + space + "'.");
- /**
- * @return Skip field lookup flag.
- */
- public boolean skipFieldLookup() {
- return skipFieldLookup;
- }
+ types.put(typeId, desc);
- /**
- * @param skipFieldLookup Skip field lookup flag.
- */
- public void skipFieldLookup(boolean skipFieldLookup) {
- this.skipFieldLookup = skipFieldLookup;
+ if (altTypeId != null)
+ types.put(altTypeId, desc);
+
+ for (QueryIndexDescriptorImpl idx : desc.indexes0()) {
+ QueryIndexKey idxKey = new QueryIndexKey(space, idx.name());
+
+ QueryIndexDescriptorImpl oldIdx = idxs.putIfAbsent(idxKey, idx);
+
+ if (oldIdx != null) {
+ throw new IgniteException("Duplicate index name [space=" + space +
+ ", idxName=" + idx.name() + ", existingTable=" + oldIdx.typeDescriptor().tableName() +
+ ", table=" + desc.tableName() + ']');
+ }
+ }
+
+ idx.registerType(space, desc);
+ }
+
+ spaces.add(CU.mask(space));
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ unregisterCache0(space);
+
+ throw e;
+ }
+ }
}
/**
@@ -307,13 +1317,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
private void unregisterCache0(String space) {
assert idx != null;
- try {
- idx.unregisterCache(space);
- }
- catch (Exception e) {
- U.error(log, "Failed to clear indexing on cache unregister (will ignore): " + space, e);
- }
- finally {
+ synchronized (stateMux) {
+ // Clear types.
Iterator<Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl>> it = types.entrySet().iterator();
while (it.hasNext()) {
@@ -323,9 +1328,79 @@ public class GridQueryProcessor extends GridProcessorAdapter {
it.remove();
typesByName.remove(new QueryTypeNameKey(space, entry.getValue().name()));
+
+ entry.getValue().markObsolete();
}
}
+
+ // Clear indexes.
+ Iterator<Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl>> idxIt = idxs.entrySet().iterator();
+
+ while (idxIt.hasNext()) {
+ Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl> idxEntry = idxIt.next();
+
+ QueryIndexKey idxKey = idxEntry.getKey();
+
+ if (F.eq(space, idxKey.space()))
+ idxIt.remove();
+ }
+
+ // Notify in-progress index operations.
+ for (SchemaOperation op : schemaOps.values()) {
+ if (op.started())
+ op.manager().worker().cancel();
+ }
+
+ // Notify indexing.
+ try {
+ idx.unregisterCache(space);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to clear indexing on cache unregister (will ignore): " + space, e);
+ }
+
+ spaces.remove(CU.mask(space));
+ }
+ }
+
+ /**
+ * Check whether provided key and value belongs to expected space and table.
+ *
+ * @param cctx Target cache context.
+ * @param expSpace Expected space.
+ * @param expTblName Expected table name.
+ * @param key Key.
+ * @param val Value.
+ * @return {@code True} if this key-value pair belongs to expected space/table, {@code false} otherwise or
+ * if space or table doesn't exist.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public boolean belongsToTable(GridCacheContext cctx, String expSpace, String expTblName, KeyCacheObject key,
+ CacheObject val) throws IgniteCheckedException {
+ QueryTypeDescriptorImpl desc = type(expSpace, val);
+
+ if (desc == null)
+ return false;
+
+ if (!F.eq(expTblName, desc.tableName()))
+ return false;
+
+ if (!cctx.cacheObjects().isBinaryObject(val)) {
+ Class<?> valCls = val.value(cctx.cacheObjectContext(), false).getClass();
+
+ if (!desc.valueClass().isAssignableFrom(valCls))
+ return false;
}
+
+ if (!cctx.cacheObjects().isBinaryObject(key)) {
+ Class<?> keyCls = key.value(cctx.cacheObjectContext(), false).getClass();
+
+ if (!desc.keyClass().isAssignableFrom(keyCls))
+ return false;
+ }
+
+ return true;
}
/**
@@ -457,7 +1532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (desc == null)
return;
- idx.store(space, desc, key, partId, val, ver, expirationTime, link);
+ idx.store(space, desc.name(), key, partId, val, ver, expirationTime, link);
}
finally {
busyLock.leaveBusy();
@@ -518,6 +1593,30 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Gets type descriptor for space by given object's type.
+ *
+ * @param space Space name.
+ * @param val Object to determine type for.
+ * @return Type descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private QueryTypeDescriptorImpl type(@Nullable String space, CacheObject val) throws IgniteCheckedException {
+ CacheObjectContext coctx = cacheObjectContext(space);
+
+ QueryTypeIdKey id;
+
+ boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
+
+ if (binaryVal)
+ id = new QueryTypeIdKey(space, ctx.cacheObjects().typeId(val));
+ else
+ id = new QueryTypeIdKey(space, val.value(coctx, false).getClass());
+
+ return types.get(id);
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
private void checkEnabled() throws IgniteCheckedException {
@@ -637,9 +1736,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
String type = qry.getType();
- QueryTypeDescriptorImpl typeDesc = type(cctx.name(), type);
+ String typeName = typeName(cctx.name(), type);
- qry.setType(typeDesc.name());
+ qry.setType(typeName);
sendQueryExecutedEvent(
qry.getSql(),
@@ -682,6 +1781,80 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Entry point for index procedure.
+ *
+ * @param space Space name.
+ * @param tblName Table name.
+ * @param idx Index.
+ * @param ifNotExists When set to {@code true} operation will fail if index already exists.
+ * @return Future completed when index is created.
+ */
+ public IgniteInternalFuture<?> dynamicIndexCreate(String space, String tblName, QueryIndex idx,
+ boolean ifNotExists) {
+ SchemaAbstractOperation op = new SchemaIndexCreateOperation(UUID.randomUUID(), space, tblName, idx, ifNotExists);
+
+ return startIndexOperationDistributed(op);
+ }
+
+ /**
+ * Entry point for index drop procedure
+ *
+ * @param idxName Index name.
+ * @param ifExists When set to {@code true} operation fill fail if index doesn't exists.
+ * @return Future completed when index is created.
+ */
+ public IgniteInternalFuture<?> dynamicIndexDrop(String space, String idxName, boolean ifExists) {
+ SchemaAbstractOperation op = new SchemaIndexDropOperation(UUID.randomUUID(), space, idxName, ifExists);
+
+ return startIndexOperationDistributed(op);
+ }
+
+ /**
+ * Start distributed index change operation.
+ *
+ * @param op Operation.
+ * @return Future.
+ */
+ private IgniteInternalFuture<?> startIndexOperationDistributed(SchemaAbstractOperation op) {
+ SchemaOperationClientFuture fut = new SchemaOperationClientFuture(op.id());
+
+ SchemaOperationClientFuture oldFut = schemaCliFuts.put(op.id(), fut);
+
+ assert oldFut == null;
+
+ try {
+ ctx.discovery().sendCustomEvent(new SchemaProposeDiscoveryMessage(op));
+
+ if (log.isDebugEnabled())
+ log.debug("Sent schema propose discovery message [opId=" + op.id() + ", op=" + op + ']');
+
+ boolean disconnected0;
+
+ synchronized (stateMux) {
+ disconnected0 = disconnected;
+ }
+
+ if (disconnected0) {
+ fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown)."));
+
+ schemaCliFuts.remove(op.id());
+ }
+ }
+ catch (Exception e) {
+ if (e instanceof SchemaOperationException)
+ fut.onDone(e);
+ else {
+ fut.onDone(new SchemaOperationException("Failed to start schema change operation due to " +
+ "unexpected exception [opId=" + op.id() + ", op=" + op + ']', e));
+ }
+
+ schemaCliFuts.remove(op.id());
+ }
+
+ return fut;
+ }
+
+ /**
* @param sqlQry Sql query.
* @param params Params.
*/
@@ -704,14 +1877,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * @param schema Schema.
+ *
+ * @param space Space name.
* @param sql Query.
* @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
*/
- public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
+ public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException {
checkxEnabled();
- return idx.prepareNativeStatement(schema, sql);
+ return idx.prepareNativeStatement(space, sql);
}
/**
@@ -838,9 +2012,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return executeQuery(GridCacheQueryType.TEXT, clause, cctx,
new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
@Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
- QueryTypeDescriptorImpl type = type(space, resType);
+ String typeName = typeName(space, resType);
- return idx.queryLocalText(space, clause, type, filters);
+ return idx.queryLocalText(space, clause, typeName, filters);
}
}, true);
}
@@ -924,20 +2098,35 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Gets type descriptor for space and type name.
+ * Get type descriptor for the given space and table name.
+ * @param space Space.
+ * @param tblName Table name.
+ * @return Type (if any).
+ */
+ @Nullable private QueryTypeDescriptorImpl type(@Nullable String space, String tblName) {
+ for (QueryTypeDescriptorImpl type : types.values()) {
+ if (F.eq(space, type.space()) && F.eq(tblName, type.tableName()))
+ return type;
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets type name for provided space and type name if type is still valid.
*
* @param space Space name.
* @param typeName Type name.
* @return Type descriptor.
* @throws IgniteCheckedException If failed.
*/
- public QueryTypeDescriptorImpl type(@Nullable String space, String typeName) throws IgniteCheckedException {
+ private String typeName(@Nullable String space, String typeName) throws IgniteCheckedException {
QueryTypeDescriptorImpl type = typesByName.get(new QueryTypeNameKey(space, typeName));
if (type == null)
throw new IgniteCheckedException("Failed to find SQL table for type: " + typeName);
- return type;
+ return type.name();
}
/**
@@ -972,7 +2161,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw (IgniteCheckedException)err;
}
- catch (CacheException e) {
+ catch (CacheException | IgniteException e) {
err = e;
throw e;
@@ -998,6 +2187,146 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Send status message to coordinator node.
+ *
+ * @param destNodeId Destination node ID.
+ * @param opId Operation ID.
+ * @param err Error.
+ */
+ public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err) {
+ if (log.isDebugEnabled())
+ log.debug("Sending schema operation status message [opId=" + opId + ", crdNode=" + destNodeId +
+ ", err=" + err + ']');
+
+ try {
+ byte[] errBytes = marshalSchemaError(opId, err);
+
+ SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes);
+
+ // Messages must go to dedicated schema pool. We cannot push them to query pool because in this case
+ // they could be blocked with other query requests.
+ ctx.io().sendToGridTopic(destNodeId, TOPIC_SCHEMA, msg, SCHEMA_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send schema status response [opId=" + opId + ", destNodeId=" + destNodeId +
+ ", err=" + e + ']');
+ }
+ }
+
+ /**
+ * Process status message.
+ *
+ * @param msg Status message.
+ */
+ private void processStatusMessage(SchemaOperationStatusMessage msg) {
+ synchronized (stateMux) {
+ if (completedOpIds.contains(msg.operationId())) {
+ // Received message from a node which joined topology in the middle of operation execution.
+ if (log.isDebugEnabled())
+ log.debug("Received status message for completed operation (will ignore) [" +
+ "opId=" + msg.operationId() + ", sndNodeId=" + msg.senderNodeId() + ']');
+
+ return;
+ }
+
+ UUID opId = msg.operationId();
+
+ SchemaProposeDiscoveryMessage proposeMsg = activeProposals.get(opId);
+
+ if (proposeMsg != null) {
+ SchemaOperation op = schemaOps.get(proposeMsg.schemaKey());
+
+ if (op != null && F.eq(op.id(), opId) && op.started() && coordinator().isLocal()) {
+ if (log.isDebugEnabled())
+ log.debug("Received status message [opId=" + msg.operationId() +
+ ", sndNodeId=" + msg.senderNodeId() + ']');
+
+ op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
+
+ return;
+ }
+ }
+
+ // Put to pending set if operation is not visible/ready yet.
+ pendingMsgs.add(msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Received status message (added to pending set) [opId=" + msg.operationId() +
+ ", sndNodeId=" + msg.senderNodeId() + ']');
+ }
+ }
+
+ /**
+ * Unwind pending messages for particular operation.
+ *
+ * @param opId Operation ID.
+ * @param mgr Manager.
+ */
+ private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) {
+ assert Thread.holdsLock(stateMux);
+
+ Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator();
+
+ while (it.hasNext()) {
+ SchemaOperationStatusMessage msg = it.next();
+
+ if (F.eq(msg.operationId(), opId)) {
+ mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
+
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Marshal schema error.
+ *
+ * @param err Error.
+ * @return Error bytes.
+ */
+ @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable SchemaOperationException err) {
+ if (err == null)
+ return null;
+
+ try {
+ return U.marshal(marsh, err);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to marshal schema operation error [opId=" + opId + ", err=" + err + ']', e);
+
+ try {
+ return U.marshal(marsh, new SchemaOperationException("Operation failed, but error cannot be " +
+ "serialized (see local node log for more details) [opId=" + opId + ", nodeId=" +
+ ctx.localNodeId() + ']'));
+ }
+ catch (Exception e0) {
+ assert false; // Impossible situation.
+
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Unmarshal schema error.
+ *
+ * @param errBytes Error bytes.
+ * @return Error.
+ */
+ @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable byte[] errBytes) {
+ if (errBytes == null)
+ return null;
+
+ try {
+ return U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config()));
+ }
+ catch (Exception e) {
+ return new SchemaOperationException("Operation failed, but error cannot be deserialized.");
+ }
+ }
+
+ /**
* @param ver Version.
*/
public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
@@ -1010,4 +2339,160 @@ public class GridQueryProcessor extends GridProcessorAdapter {
public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
return requestTopVer.get();
}
+
+ /**
+ * Schema operation.
+ */
+ private class SchemaOperation {
+ /** Original propose msg. */
+ private final SchemaProposeDiscoveryMessage proposeMsg;
+
+ /** Next schema operation. */
+ private SchemaOperation next;
+
+ /** Operation manager. */
+ private SchemaOperationManager mgr;
+
+ /** Finish message. */
+ private SchemaFinishDiscoveryMessage finishMsg;
+
+ /** Finish guard. */
+ private final AtomicBoolean finishGuard = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param proposeMsg Original propose message.
+ */
+ public SchemaOperation(SchemaProposeDiscoveryMessage proposeMsg) {
+ this.proposeMsg = proposeMsg;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ public UUID id() {
+ return proposeMsg.operation().id();
+ }
+
+ /**
+ * @return Original propose message.
+ */
+ public SchemaProposeDiscoveryMessage proposeMessage() {
+ return proposeMsg;
+ }
+
+ /**
+ * @return Next schema operation.
+ */
+ @Nullable public SchemaOperation next() {
+ return next;
+ }
+
+ /**
+ * @param next Next schema operation.
+ */
+ public void next(SchemaOperation next) {
+ this.next = next;
+ }
+
+ /**
+ * @param finishMsg Finish message.
+ */
+ public void finishMessage(SchemaFinishDiscoveryMessage finishMsg) {
+ this.finishMsg = finishMsg;
+ }
+
+ /**
+ * @return {@code True} if finish request already received.
+ */
+ public boolean hasFinishMessage() {
+ return finishMsg != null;
+ }
+
+ /**
+ * Handle finish message.
+ */
+ @SuppressWarnings("unchecked")
+ public void doFinish() {
+ assert started();
+
+ if (!finishGuard.compareAndSet(false, true))
+ return;
+
+ final UUID opId = id();
+ final SchemaKey key = proposeMsg.schemaKey();
+
+ // Operation might be still in progress on client nodes which are not tracked by coordinator,
+ // so we chain to operation future instead of doing synchronous unwind.
+ mgr.worker().future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ synchronized (stateMux) {
+ SchemaOperation op = schemaOps.remove(key);
+
+ assert op != null;
+ assert F.eq(op.id(), opId);
+
+ // Chain to the next operation (if any).
+ final SchemaOperation nextOp = op.next();
+
+ if (nextOp != null) {
+ schemaOps.put(key, nextOp);
+
+ if (log.isDebugEnabled())
+ log.debug("Next schema change operation started [opId=" + nextOp.id() + ']');
+
+ assert !nextOp.started();
+
+ // Cannot execute operation synchronously because it may cause starvation in exchange
+ // thread under load. Hence, moving short-lived operation to separate worker.
+ new IgniteThread(ctx.igniteInstanceName(), "schema-circuit-breaker-" + op.id(),
+ new Runnable() {
+ @Override public void run() {
+ onSchemaPropose(nextOp.proposeMessage());
+ }
+ }).start();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Unwind operation queue and get tail operation.
+ *
+ * @return Tail operation.
+ */
+ public SchemaOperation unwind() {
+ if (next == null)
+ return this;
+ else
+ return next.unwind();
+ }
+
+ /**
+ * Whether operation started.
+ *
+ * @return {@code True} if started.
+ */
+ public boolean started() {
+ return mgr != null;
+ }
+
+ /**
+ * @return Operation manager.
+ */
+ public SchemaOperationManager manager() {
+ return mgr;
+ }
+
+ /**
+ * @param mgr Operation manager.
+ */
+ public void manager(SchemaOperationManager mgr) {
+ assert this.mgr == null;
+
+ this.mgr = mgr;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
index 44c41c1..b7434d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
@@ -81,6 +81,13 @@ public interface GridQueryTypeDescriptor {
public Map<String, GridQueryIndexDescriptor> indexes();
/**
+ * Get text index for this type (if any).
+ *
+ * @return Text index or {@code null}.
+ */
+ public GridQueryIndexDescriptor textIndex();
+
+ /**
* Gets value class.
*
* @return Value class.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
index b15007e..0666493 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
@@ -81,6 +81,13 @@ public class IgniteSQLException extends IgniteException {
}
/**
+ * @return Ignite SQL error code.
+ */
+ public int statusCode() {
+ return statusCode;
+ }
+
+ /**
* @return JDBC exception containing details from this instance.
*/
public SQLException toJdbcException() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
index 9d2d20c..1b85af5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.query;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -45,22 +47,48 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor {
/** Fields which should be indexed in descending order. */
private Collection<String> descendings;
+ /** Type descriptor. */
+ @GridToStringExclude
+ private final QueryTypeDescriptorImpl typDesc;
+
+ /** Index name. */
+ private final String name;
+
/** */
private final QueryIndexType type;
/** */
- private int inlineSize;
+ private final int inlineSize;
/**
+ * Constructor.
+ *
+ * @param typDesc Type descriptor.
+ * @param name Index name.
* @param type Type.
+ * @param inlineSize Inline size.
*/
- public QueryIndexDescriptorImpl(QueryIndexType type, int inlineSize) {
+ public QueryIndexDescriptorImpl(QueryTypeDescriptorImpl typDesc, String name, QueryIndexType type, int inlineSize) {
assert type != null;
+ this.typDesc = typDesc;
+ this.name = name;
this.type = type;
this.inlineSize = inlineSize;
}
+ /**
+ * @return Type descriptor.
+ */
+ public QueryTypeDescriptorImpl typeDescriptor() {
+ return typDesc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
/** {@inheritDoc} */
@Override public Collection<String> fields() {
Collection<String> res = new ArrayList<>(fields.size());
@@ -87,8 +115,14 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor {
* @param field Field name.
* @param orderNum Field order number in this index.
* @param descending Sort order.
+ * @return This instance for chaining.
+ * @throws IgniteCheckedException If failed.
*/
- public void addField(String field, int orderNum, boolean descending) {
+ public QueryIndexDescriptorImpl addField(String field, int orderNum, boolean descending)
+ throws IgniteCheckedException {
+ if (!typDesc.hasField(field))
+ throw new IgniteCheckedException("Field not found: " + field);
+
fields.add(new T2<>(field, orderNum));
if (descending) {
@@ -97,6 +131,8 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor {
descendings.add(field);
}
+
+ return this;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
new file mode 100644
index 0000000..f580111
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+
+/**
+ * Index key.
+ */
+public class QueryIndexKey implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Space. */
+ private final String space;
+
+ /** Name. */
+ private final String name;
+
+ /**
+ * Constructor.
+ *
+ * @param space Space.
+ * @param name Name.
+ */
+ public QueryIndexKey(String space, String name) {
+ this.space = space;
+ this.name = name;
+ }
+
+ /**
+ * @return Space.
+ */
+ public String space() {
+ return space;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * (space != null ? space.hashCode() : 0) + (name != null ? name.hashCode() : 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryIndexKey other = (QueryIndexKey)o;
+
+ return F.eq(name, other.name) && F.eq(space, other.space);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryIndexKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
new file mode 100644
index 0000000..395f077
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Dynamic cache schema.
+ */
+public class QuerySchema implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Query entities. */
+ private final Collection<QueryEntity> entities = new LinkedList<>();
+
+ /** Mutex for state synchronization. */
+ private final Object mux = new Object();
+
+ /**
+ * Default constructor.
+ */
+ public QuerySchema() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param entities Query entities.
+ */
+ public QuerySchema(Collection<QueryEntity> entities) {
+ assert entities != null;
+
+ for (QueryEntity qryEntity : entities)
+ this.entities.add(new QueryEntity(qryEntity));
+ }
+
+ /**
+ * Copy object.
+ *
+ * @return Copy.
+ */
+ public QuerySchema copy() {
+ synchronized (mux) {
+ QuerySchema res = new QuerySchema();
+
+ for (QueryEntity qryEntity : entities)
+ res.entities.add(new QueryEntity(qryEntity));
+
+ return res;
+ }
+ }
+
+ /**
+ * Process finish message.
+ *
+ * @param msg Message.
+ */
+ public void finish(SchemaFinishDiscoveryMessage msg) {
+ synchronized (mux) {
+ SchemaAbstractOperation op = msg.operation();
+
+ if (op instanceof SchemaIndexCreateOperation) {
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+
+ for (QueryEntity entity : entities) {
+ String tblName = QueryUtils.tableName(entity);
+
+ if (F.eq(tblName, op0.tableName())) {
+ boolean exists = false;
+
+ for (QueryIndex idx : entity.getIndexes()) {
+ if (F.eq(idx.getName(), op0.indexName())) {
+ exists = true;
+
+ break;
+ }
+ }
+
+ if (!exists) {
+ List<QueryIndex> idxs = new ArrayList<>(entity.getIndexes());
+
+ idxs.add(op0.index());
+
+ entity.clearIndexes();
+ entity.setIndexes(idxs);
+ }
+
+ break;
+ }
+ }
+ }
+ else {
+ assert op instanceof SchemaIndexDropOperation;
+
+ SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
+
+ for (QueryEntity entity : entities) {
+ Collection<QueryIndex> idxs = entity.getIndexes();
+
+ QueryIndex victim = null;
+
+ for (QueryIndex idx : idxs) {
+ if (F.eq(idx.getName(), op0.indexName())) {
+ victim = idx;
+
+ break;
+ }
+ }
+
+ if (victim != null) {
+ List<QueryIndex> newIdxs = new ArrayList<>(entity.getIndexes());
+
+ newIdxs.remove(victim);
+
+ entity.clearIndexes();
+ entity.setIndexes(idxs);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Query entities.
+ */
+ public Collection<QueryEntity> entities() {
+ synchronized (mux) {
+ return new ArrayList<>(entities);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QuerySchema.class, this);
+ }
+}