You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/21 15:36:50 UTC

[2/2] ignite git commit: Implemented operation state management.

Implemented operation state management.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41a7beac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41a7beac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41a7beac

Branch: refs/heads/ignite-4565-ddl
Commit: 41a7beac9d470d16e4aa39bd345c5ce02adbed59
Parents: c71fb3d
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 21 18:36:40 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 21 18:36:40 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |  49 ++--
 .../query/ddl/IndexOperationHandler.java        |   7 +
 .../query/ddl/IndexOperationState.java          | 251 +++++++++++++++++++
 3 files changed, 289 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 9d98115..92bc7c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -316,8 +316,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         // Apply dynamic changes to candidates.
+        Collection<AbstractIndexOperation> ops = new ArrayList<>();
+
         if (initIdxStates != null) {
             Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations();
+            Map<String, QueryIndexActiveOperation> acceptedOps = initIdxStates.acceptedActiveOperations();
 
             for (QueryTypeCandidate cand : cands) {
                 QueryTypeDescriptorImpl desc = cand.descriptor();
@@ -329,14 +332,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     if (F.eq(desc.tableName(), idxState.tableName()))
                         QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
                 }
+
+                for (Map.Entry<String, QueryIndexActiveOperation> acceptedOpEntry : acceptedOps.entrySet()) {
+                    String idxName = acceptedOpEntry.getKey();
+                    AbstractIndexOperation op = acceptedOpEntry.getValue().operation();
+
+                    if (F.eq(desc.tableName(), op.tableName())) {
+                        QueryIndex idx = op instanceof CreateIndexOperation ? ((CreateIndexOperation)op).index() : null;
+
+                        QueryUtils.processDynamicIndexChange(idxName, idx, desc);
+                    }
+
+                    ops.add(op);
+                }
             }
         }
 
-        // TODO: Apply pending operations right away!
-
         // Ready to register at this point.
         registerCache0(space, cctx, cands);
 
+        // If cache was registered successfully, start pending operations.
+        for (AbstractIndexOperation op : ops)
+            startIndexOperation(op, true, null);
+
         // Warn about possible implicit deserialization.
         if (!mustDeserializeClss.isEmpty()) {
             U.warn(log, "Some classes in query configuration cannot be written in binary format " +
@@ -348,22 +366,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
     }
 
-    /**
-     * Find current coordinator.
-     *
-     * @return {@code True} if node is coordinator.
-     */
-    private ClusterNode findCoordinator() {
-        ClusterNode res = null;
-
-        for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
-            if (res == null || res.order() > node.order())
-                res = node;
-        }
-
-        return res;
-    }
-
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
         if (idx != null)
@@ -1308,6 +1310,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Start index operation.
+     *
+     * @param op Operation.
+     * @param completed Completed flag.
+     * @param err Error.
+     */
+    private void startIndexOperation(AbstractIndexOperation op, boolean completed, Exception err) {
+        // TODO
+    }
+
+    /**
      * Process status request.
      *
      * @param req Status request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 295a0022..7825877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -123,6 +123,13 @@ public class IndexOperationHandler {
     }
 
     /**
+     * @return Operation.
+     */
+    public AbstractIndexOperation operation() {
+        return op;
+    }
+
+    /**
      * @return Future completed when operation is ready.
      */
     public IgniteInternalFuture future() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
new file mode 100644
index 0000000..75ab2e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/**
+ * Current index operation state.
+ */
+public class IndexOperationState {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Query processor. */
+    private final GridQueryProcessor qryProc;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Operation handler. */
+    private final IndexOperationHandler hnd;
+
+    /** Mutex for concurrency control. */
+    private final Object mux = new Object();
+
+    /** Whether node is coordinator. */
+    private boolean crd;
+
+    /** Participants. */
+    private Collection<UUID> nodeIds;
+
+    /** Node results. */
+    private Map<UUID, String> nodeRess;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param qryProc Query processor.
+     * @param hnd Operation handler.
+     */
+    public IndexOperationState(GridKernalContext ctx, GridQueryProcessor qryProc, IndexOperationHandler hnd) {
+        this.ctx = ctx;
+
+        log = ctx.log(IndexOperationState.class);
+
+        this.qryProc = qryProc;
+        this.hnd = hnd;
+    }
+
+    /**
+     * Map operation handling.
+     */
+    @SuppressWarnings("unchecked")
+    public void tryMap() {
+        synchronized (mux) {
+            if (isLocalCoordinator()) {
+                // Initialize local structure.
+                crd = true;
+                nodeIds = new HashSet<>();
+                nodeRess = new HashMap<>();
+
+                // Send remote requests.
+                IndexOperationStatusRequest req =
+                    new IndexOperationStatusRequest(ctx.localNodeId(), hnd.operation().operationId());
+
+                for (ClusterNode alive : ctx.discovery().aliveServerNodes()) {
+                    nodeIds.add(alive.id());
+
+                    if (!alive.isLocal()) {
+                        try {
+                            // TODO: Proper pool!
+                            ctx.io().sendToGridTopic(alive, TOPIC_DYNAMIC_SCHEMA, req, PUBLIC_POOL);
+                        }
+                        catch (IgniteCheckedException e) {
+                            // Node has left the grid.
+                            nodeIds.remove(alive.id());
+                        }
+                    }
+                }
+
+                // Listen for local completion.
+                hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+                    @Override public void apply(IgniteInternalFuture fut) {
+                        try {
+                            fut.get();
+
+                            onNodeFinished(ctx.localNodeId(), null);
+                        }
+                        catch (Exception e) {
+                            onNodeFinished(ctx.localNodeId(), e.getMessage());
+                        }
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Handle node finish.
+     *
+     * @param nodeId Node ID.
+     * @param errMsg Error message.
+     */
+    public void onNodeFinished(UUID nodeId, String errMsg) {
+        synchronized (mux) {
+            if (nodeRess.containsKey(nodeId))
+                return;
+
+            nodeRess.put(nodeId, errMsg);
+
+            checkFinished();
+        }
+    }
+
+    /**
+     * Handle node leave event.
+     *
+     * @param nodeId Node ID.
+     */
+    public void onNodeLeave(UUID nodeId) {
+        synchronized (mux) {
+            if (crd) {
+                // Handle this as success.
+                if (nodeIds.remove(nodeId))
+                    nodeRess.remove(nodeId);
+
+                checkFinished();
+            }
+            else
+                // We can become coordinator, so try remap.
+                tryMap();
+        }
+    }
+
+    /**
+     * Handle status request.
+     *
+     * @param nodeId Node ID.
+     */
+    @SuppressWarnings("unchecked")
+    public void onStatusRequest(final UUID nodeId) {
+        hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+            @Override public void apply(IgniteInternalFuture fut) {
+                String errMsg = null;
+
+                try {
+                    fut.get();
+                }
+                catch (Exception e) {
+                    errMsg = e.getMessage();
+                }
+
+                try {
+                    IndexOperationStatusResponse resp =
+                        new IndexOperationStatusResponse(ctx.localNodeId(), hnd.operation().operationId(), errMsg);
+
+                    // TODO: Proper pool!
+                    ctx.io().sendToGridTopic(nodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    // Node left, ignore.
+                    // TODO: Better logging all over the state and handler to simplify debug!
+                }
+            }
+        });
+    }
+
+    /**
+     * Find current coordinator.
+     *
+     * @return {@code True} if node is coordinator.
+     */
+    private boolean isLocalCoordinator() {
+        ClusterNode res = null;
+
+        for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+            if (res == null || res.order() > node.order())
+                res = node;
+        }
+
+        assert res != null; // Operation state can only exist on server nodes.
+
+        return F.eq(ctx.localNodeId(), res.id());
+    }
+
+    /**
+     * Check if operation finished.
+     */
+    private void checkFinished() {
+        assert Thread.holdsLock(mux);
+        assert crd;
+
+        if (nodeIds.size() == nodeRess.size()) {
+            // Initiate finish request.
+            UUID errNodeId = null;
+            String errNodeMsg = null;
+
+            for (Map.Entry<UUID, String> nodeRes : nodeRess.entrySet()) {
+                if (nodeRes.getValue() != null) {
+                    errNodeId = nodeRes.getKey();
+                    errNodeMsg = nodeRes.getValue();
+
+                    break;
+                }
+            }
+
+            IndexFinishDiscoveryMessage msg = new IndexFinishDiscoveryMessage(hnd.operation(), errNodeId, errNodeMsg);
+
+            try {
+                ctx.discovery().sendCustomEvent(msg);
+            }
+            catch (Exception e) {
+                // Failed to send finish message over discovery. This is something unrecoverable.
+                U.warn(log, "Failed to send index finish discovery message [op=" + hnd.operation() + ']', e);
+            }
+        }
+    }
+}
\ No newline at end of file