You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/15 12:50:13 UTC

ignite git commit: WIP on single-threaded index process manager.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl a371d251a -> 5a7afc7be


WIP on single-threaded index process manager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a7afc7b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a7afc7b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a7afc7b

Branch: refs/heads/ignite-4565-ddl
Commit: 5a7afc7be6ee05cc385b3042deb76b72bd6fea4c
Parents: a371d25
Author: devozerov <vo...@gridgain.com>
Authored: Wed Mar 15 15:49:59 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Mar 15 15:49:59 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/query/GridQueryProcessor.java    | 228 +++++++++++++++++++
 .../query/ddl/IndexOperationStatusMessage.java  | 149 ++++++++++++
 3 files changed, 379 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e1aec02..7e56ee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,8 +136,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -859,12 +858,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case -48:
-                msg = new DdlOperationResult();
-
-                break;
-
-            case -49:
-                msg = new DdlOperationNodeResult();
+                msg = new IndexOperationStatusMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0fb77eb..ad978c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -22,12 +22,14 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.cache.Cache;
@@ -35,6 +37,7 @@ import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.cache.CacheTypeMetadata;
 import org.apache.ignite.cache.QueryEntity;
@@ -42,10 +45,12 @@ import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -60,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -69,6 +75,7 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -1231,4 +1238,225 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
         return requestTopVer.get();
     }
+
+    /**
+     * Worker which manages overall dynamic index creation process.
+     */
+    private class DynamicIndexManagerWorker extends GridWorker {
+        /** Tasks queue. */
+        private final LinkedBlockingQueue<DynamicIndexTask> tasks = new LinkedBlockingQueue<>();
+
+        /** Alive nodes. */
+        private Collection<ClusterNode> aliveNodes;
+
+        /** Local node ID. */
+        private UUID locNodeId;
+
+        /** Coordinator node. */
+        private ClusterNode crdNode;
+
+        /**
+         * Constructor.
+         *
+         * @param igniteInstanceName Ignite instance name.
+         * @param log Logger.
+         */
+        public DynamicIndexManagerWorker(String igniteInstanceName, IgniteLogger log) {
+            super(igniteInstanceName, "dynamic-index-manager-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            // TODO Find coordinator.
+
+            // TODO: Start processing tasks
+        }
+
+        /**
+         * Submit new task.
+         *
+         * @param task Task.
+         */
+        public void submit(DynamicIndexTask task) {
+            tasks.add(task);
+        }
+
+        /**
+         * Update topology in response to node leave event.
+         */
+        private void updateTopology() {
+            boolean crdChanged = true;
+            Collection<ClusterNode> leftNodes = new HashSet<>();
+
+            if (aliveNodes == null) {
+                // First call.
+                aliveNodes = new HashSet<>(ctx.discovery().aliveServerNodes());
+
+                for (ClusterNode aliveNode : aliveNodes) {
+                    if (crdNode == null || crdNode.order() > aliveNode.order()) {
+                        crdNode = aliveNode;
+
+                        crdChanged = true;
+                    }
+                }
+            }
+            else {
+                Collection<ClusterNode> aliveNodes0 = ctx.discovery().aliveServerNodes();
+
+                for (ClusterNode aliveNode : aliveNodes0) {
+                    if (!aliveNodes.contains(aliveNode))
+                        leftNodes.add(aliveNode);
+                }
+
+                aliveNodes = aliveNodes0;
+
+                if (leftNodes.contains(crdNode)) {
+                    crdNode = null;
+
+                    for (ClusterNode aliveNode : aliveNodes) {
+                        if (crdNode == null || crdNode.order() > aliveNode.order()) {
+                            crdNode = aliveNode;
+
+                            crdChanged = true;
+                        }
+                    }
+                }
+            }
+
+            for (ClusterNode leftNode : leftNodes) {
+                // TODO: Process left nodes
+            }
+
+            if (crdChanged) {
+                // TODO: Process new coordinator.
+            }
+        }
+    }
+
+    /**
+     * Marker interface for index-related tasks.
+     */
+    private static interface DynamicIndexTask {
+        // No-op.
+    }
+
+    /**
+     * Change index task.
+     */
+    private static class ChangeIndexTask implements DynamicIndexTask {
+        /** Operation. */
+        private final AbstractIndexOperation op;
+
+        /**
+         * Constructor.
+         *
+         * @param op Operation.
+         */
+        public ChangeIndexTask(AbstractIndexOperation op) {
+            this.op = op;
+        }
+
+        /**
+         * @return Operation.
+         */
+        public AbstractIndexOperation operation() {
+            return op;
+        }
+    }
+
+    /**
+     * Node leave task.
+     */
+    private static class NodeLeaveTask implements DynamicIndexTask {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /**
+         * Constructor.
+         *
+         * @param nodeId Node ID.
+         */
+        public NodeLeaveTask(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public UUID nodeId() {
+            return nodeId;
+        }
+    }
+
+    /**
+     * Type removal task (either due to cache stop or due to type undeploy).
+     */
+    private static class TypeRemoveTask implements DynamicIndexTask {
+        /** Type descriptor. */
+        private final QueryTypeDescriptorImpl typeDesc;
+
+        /**
+         * Constructor.
+         *
+         * @param typeDesc Type descriptor.
+         */
+        public TypeRemoveTask(QueryTypeDescriptorImpl typeDesc) {
+            this.typeDesc = typeDesc;
+        }
+
+        /**
+         * @return Type descriptor.
+         */
+        public QueryTypeDescriptorImpl typeDescriptor() {
+            return typeDesc;
+        }
+    }
+
+    /**
+     * Operation status message received.
+     */
+    private static class OperationStatusTask implements DynamicIndexTask {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Operation ID. */
+        private final UUID opId;
+
+        /** Error message. */
+        private final String errMsg;
+
+        /**
+         * Constructor.
+         *
+         * @param nodeId Node ID.
+         * @param opId Operation ID.
+         * @param errMsg Error message.
+         */
+        public OperationStatusTask(UUID nodeId, UUID opId, String errMsg) {
+            this.nodeId = nodeId;
+            this.opId = opId;
+            this.errMsg = errMsg;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Operation ID.
+         */
+        public UUID operationId() {
+            return opId;
+        }
+
+        /**
+         * @return Error message (if any).
+         */
+        @Nullable public String errorMessage() {
+            return errMsg;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a7afc7b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
new file mode 100644
index 0000000..bfec530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Message with index operation status. Sent from participant to coordinator when index creation is completed or
+ * when coordinator changes.
+ */
+public class IndexOperationStatusMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Operation ID. */
+    private UUID opId;
+
+    /** Error message. */
+    private String errMsg;
+
+    /**
+     * Default constructor.
+     */
+    public IndexOperationStatusMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param errMsg Error message.
+     */
+    public IndexOperationStatusMessage(UUID opId, String errMsg) {
+        this.opId = opId;
+        this.errMsg = errMsg;
+    }
+
+    /**
+     * @return Operation ID.
+     */
+    public UUID operationId() {
+        return opId;
+    }
+
+    /**
+     * @return Error message.
+     */
+    public String errorMessage() {
+        return errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("opId", opId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeString("errMsg", errMsg))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                opId = reader.readUuid("opId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errMsg = reader.readString("errMsg");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(IndexOperationStatusMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -48;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexOperationStatusMessage.class, this);
+    }
+}