You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/15 12:50:13 UTC
ignite git commit: WIP on single-threaded index process manager.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl a371d251a -> 5a7afc7be
WIP on single-threaded index process manager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a7afc7b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a7afc7b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a7afc7b
Branch: refs/heads/ignite-4565-ddl
Commit: 5a7afc7be6ee05cc385b3042deb76b72bd6fea4c
Parents: a371d25
Author: devozerov <vo...@gridgain.com>
Authored: Wed Mar 15 15:49:59 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Mar 15 15:49:59 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/query/GridQueryProcessor.java | 228 +++++++++++++++++++
.../query/ddl/IndexOperationStatusMessage.java | 149 ++++++++++++
3 files changed, 379 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e1aec02..7e56ee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,8 +136,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -859,12 +858,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case -48:
- msg = new DdlOperationResult();
-
- break;
-
- case -49:
- msg = new DdlOperationNodeResult();
+ msg = new IndexOperationStatusMessage();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0fb77eb..ad978c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -22,12 +22,14 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.Cache;
@@ -35,6 +37,7 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cache.CacheTypeMetadata;
import org.apache.ignite.cache.QueryEntity;
@@ -42,10 +45,12 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -60,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -69,6 +75,7 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -1231,4 +1238,225 @@ public class GridQueryProcessor extends GridProcessorAdapter {
public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
return requestTopVer.get();
}
+
+ /**
+ * Worker which manages overall dynamic index creation process.
+ */
+ private class DynamicIndexManagerWorker extends GridWorker {
+ /** Tasks queue. */
+ private final LinkedBlockingQueue<DynamicIndexTask> tasks = new LinkedBlockingQueue<>();
+
+ /** Alive nodes. */
+ private Collection<ClusterNode> aliveNodes;
+
+ /** Local node ID. */
+ private UUID locNodeId;
+
+ /** Coordinator node. */
+ private ClusterNode crdNode;
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param log Logger.
+ */
+ public DynamicIndexManagerWorker(String igniteInstanceName, IgniteLogger log) {
+ super(igniteInstanceName, "dynamic-index-manager-worker", log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ // TODO Find coordinator.
+
+ // TODO: Start processing tasks
+ }
+
+ /**
+ * Submit new task.
+ *
+ * @param task Task.
+ */
+ public void submit(DynamicIndexTask task) {
+ tasks.add(task);
+ }
+
+ /**
+ * Update topology in response to node leave event.
+ */
+ private void updateTopology() {
+ boolean crdChanged = true;
+ Collection<ClusterNode> leftNodes = new HashSet<>();
+
+ if (aliveNodes == null) {
+ // First call.
+ aliveNodes = new HashSet<>(ctx.discovery().aliveServerNodes());
+
+ for (ClusterNode aliveNode : aliveNodes) {
+ if (crdNode == null || crdNode.order() > aliveNode.order()) {
+ crdNode = aliveNode;
+
+ crdChanged = true;
+ }
+ }
+ }
+ else {
+ Collection<ClusterNode> aliveNodes0 = ctx.discovery().aliveServerNodes();
+
+ for (ClusterNode aliveNode : aliveNodes0) {
+ if (!aliveNodes.contains(aliveNode))
+ leftNodes.add(aliveNode);
+ }
+
+ aliveNodes = aliveNodes0;
+
+ if (leftNodes.contains(crdNode)) {
+ crdNode = null;
+
+ for (ClusterNode aliveNode : aliveNodes) {
+ if (crdNode == null || crdNode.order() > aliveNode.order()) {
+ crdNode = aliveNode;
+
+ crdChanged = true;
+ }
+ }
+ }
+ }
+
+ for (ClusterNode leftNode : leftNodes) {
+ // TODO: Process left nodes
+ }
+
+ if (crdChanged) {
+ // TODO: Process new coordinator.
+ }
+ }
+ }
+
+ /**
+ * Marker interface for index-related tasks.
+ */
+ private static interface DynamicIndexTask {
+ // No-op.
+ }
+
+ /**
+ * Change index task.
+ */
+ private static class ChangeIndexTask implements DynamicIndexTask {
+ /** Operation. */
+ private final AbstractIndexOperation op;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ public ChangeIndexTask(AbstractIndexOperation op) {
+ this.op = op;
+ }
+
+ /**
+ * @return Operation.
+ */
+ public AbstractIndexOperation operation() {
+ return op;
+ }
+ }
+
+ /**
+ * Node leave task.
+ */
+ private static class NodeLeaveTask implements DynamicIndexTask {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ */
+ public NodeLeaveTask(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+ }
+
+ /**
+ * Type removal task (either due to cache stop or due to type undeploy).
+ */
+ private static class TypeRemoveTask implements DynamicIndexTask {
+ /** Type descriptor. */
+ private final QueryTypeDescriptorImpl typeDesc;
+
+ /**
+ * Constructor.
+ *
+ * @param typeDesc Type descriptor.
+ */
+ public TypeRemoveTask(QueryTypeDescriptorImpl typeDesc) {
+ this.typeDesc = typeDesc;
+ }
+
+ /**
+ * @return Type descriptor.
+ */
+ public QueryTypeDescriptorImpl typeDescriptor() {
+ return typeDesc;
+ }
+ }
+
+ /**
+ * Operation status message received.
+ */
+ private static class OperationStatusTask implements DynamicIndexTask {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Operation ID. */
+ private final UUID opId;
+
+ /** Error message. */
+ private final String errMsg;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeId Node ID.
+ * @param opId Operation ID.
+ * @param errMsg Error message.
+ */
+ public OperationStatusTask(UUID nodeId, UUID opId, String errMsg) {
+ this.nodeId = nodeId;
+ this.opId = opId;
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ public UUID operationId() {
+ return opId;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public String errorMessage() {
+ return errMsg;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
new file mode 100644
index 0000000..bfec530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Message with index operation status. Sent from participant to coordinator when index creation is completed or
+ * when coordinator changes.
+ */
+public class IndexOperationStatusMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Operation ID. */
+ private UUID opId;
+
+ /** Error message. */
+ private String errMsg;
+
+ /**
+ * Default constructor.
+ */
+ public IndexOperationStatusMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ * @param errMsg Error message.
+ */
+ public IndexOperationStatusMessage(UUID opId, String errMsg) {
+ this.opId = opId;
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Operation ID.
+ */
+ public UUID operationId() {
+ return opId;
+ }
+
+ /**
+ * @return Error message.
+ */
+ public String errorMessage() {
+ return errMsg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeUuid("opId", opId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeString("errMsg", errMsg))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ opId = reader.readUuid("opId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ errMsg = reader.readString("errMsg");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(IndexOperationStatusMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -48;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexOperationStatusMessage.class, this);
+ }
+}