You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/21 15:36:50 UTC
[2/2] ignite git commit: Implemented operation state management.
Implemented operation state management.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41a7beac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41a7beac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41a7beac
Branch: refs/heads/ignite-4565-ddl
Commit: 41a7beac9d470d16e4aa39bd345c5ce02adbed59
Parents: c71fb3d
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 21 18:36:40 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 21 18:36:40 2017 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryProcessor.java | 49 ++--
.../query/ddl/IndexOperationHandler.java | 7 +
.../query/ddl/IndexOperationState.java | 251 +++++++++++++++++++
3 files changed, 289 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 9d98115..92bc7c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -316,8 +316,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
// Apply dynamic changes to candidates.
+ Collection<AbstractIndexOperation> ops = new ArrayList<>();
+
if (initIdxStates != null) {
Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations();
+ Map<String, QueryIndexActiveOperation> acceptedOps = initIdxStates.acceptedActiveOperations();
for (QueryTypeCandidate cand : cands) {
QueryTypeDescriptorImpl desc = cand.descriptor();
@@ -329,14 +332,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (F.eq(desc.tableName(), idxState.tableName()))
QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
}
+
+ for (Map.Entry<String, QueryIndexActiveOperation> acceptedOpEntry : acceptedOps.entrySet()) {
+ String idxName = acceptedOpEntry.getKey();
+ AbstractIndexOperation op = acceptedOpEntry.getValue().operation();
+
+ if (F.eq(desc.tableName(), op.tableName())) {
+ QueryIndex idx = op instanceof CreateIndexOperation ? ((CreateIndexOperation)op).index() : null;
+
+ QueryUtils.processDynamicIndexChange(idxName, idx, desc);
+ }
+
+ ops.add(op);
+ }
}
}
- // TODO: Apply pending operations right away!
-
// Ready to register at this point.
registerCache0(space, cctx, cands);
+ // If cache was registered successfully, start pending operations.
+ for (AbstractIndexOperation op : ops)
+ startIndexOperation(op, true, null);
+
// Warn about possible implicit deserialization.
if (!mustDeserializeClss.isEmpty()) {
U.warn(log, "Some classes in query configuration cannot be written in binary format " +
@@ -348,22 +366,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
}
- /**
- * Find current coordinator.
- *
- * @return {@code True} if node is coordinator.
- */
- private ClusterNode findCoordinator() {
- ClusterNode res = null;
-
- for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
- if (res == null || res.order() > node.order())
- res = node;
- }
-
- return res;
- }
-
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
if (idx != null)
@@ -1308,6 +1310,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Start index operation.
+ *
+ * @param op Operation.
+ * @param completed Completed flag.
+ * @param err Error.
+ */
+ private void startIndexOperation(AbstractIndexOperation op, boolean completed, Exception err) {
+ // TODO
+ }
+
+ /**
* Process status request.
*
* @param req Status request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 295a0022..7825877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -123,6 +123,13 @@ public class IndexOperationHandler {
}
/**
+ * @return Operation.
+ */
+ public AbstractIndexOperation operation() {
+ return op;
+ }
+
+ /**
* @return Future completed when operation is ready.
*/
public IgniteInternalFuture future() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
new file mode 100644
index 0000000..75ab2e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/**
+ * Current index operation state.
+ */
+public class IndexOperationState {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Query processor. */
+ private final GridQueryProcessor qryProc;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Operation handler. */
+ private final IndexOperationHandler hnd;
+
+ /** Mutex for concurrency control. */
+ private final Object mux = new Object();
+
+ /** Whether node is coordinator. */
+ private boolean crd;
+
+ /** Participants. */
+ private Collection<UUID> nodeIds;
+
+ /** Node results. */
+ private Map<UUID, String> nodeRess;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param qryProc Query processor.
+ * @param hnd Operation handler.
+ */
+ public IndexOperationState(GridKernalContext ctx, GridQueryProcessor qryProc, IndexOperationHandler hnd) {
+ this.ctx = ctx;
+
+ log = ctx.log(IndexOperationState.class);
+
+ this.qryProc = qryProc;
+ this.hnd = hnd;
+ }
+
+ /**
+ * Map operation handling.
+ */
+ @SuppressWarnings("unchecked")
+ public void tryMap() {
+ synchronized (mux) {
+ if (isLocalCoordinator()) {
+ // Initialize local structure.
+ crd = true;
+ nodeIds = new HashSet<>();
+ nodeRess = new HashMap<>();
+
+ // Send remote requests.
+ IndexOperationStatusRequest req =
+ new IndexOperationStatusRequest(ctx.localNodeId(), hnd.operation().operationId());
+
+ for (ClusterNode alive : ctx.discovery().aliveServerNodes()) {
+ nodeIds.add(alive.id());
+
+ if (!alive.isLocal()) {
+ try {
+ // TODO: Proper pool!
+ ctx.io().sendToGridTopic(alive, TOPIC_DYNAMIC_SCHEMA, req, PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ // Node has left the grid.
+ nodeIds.remove(alive.id());
+ }
+ }
+ }
+
+ // Listen for local completion.
+ hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ try {
+ fut.get();
+
+ onNodeFinished(ctx.localNodeId(), null);
+ }
+ catch (Exception e) {
+ onNodeFinished(ctx.localNodeId(), e.getMessage());
+ }
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Handle node finish.
+ *
+ * @param nodeId Node ID.
+ * @param errMsg Error message.
+ */
+ public void onNodeFinished(UUID nodeId, String errMsg) {
+ synchronized (mux) {
+ if (nodeRess.containsKey(nodeId))
+ return;
+
+ nodeRess.put(nodeId, errMsg);
+
+ checkFinished();
+ }
+ }
+
+ /**
+ * Handle node leave event.
+ *
+ * @param nodeId Node ID.
+ */
+ public void onNodeLeave(UUID nodeId) {
+ synchronized (mux) {
+ if (crd) {
+ // Handle this as success.
+ if (nodeIds.remove(nodeId))
+ nodeRess.remove(nodeId);
+
+ checkFinished();
+ }
+ else
+ // We can become coordinator, so try remap.
+ tryMap();
+ }
+ }
+
+ /**
+ * Handle status request.
+ *
+ * @param nodeId Node ID.
+ */
+ @SuppressWarnings("unchecked")
+ public void onStatusRequest(final UUID nodeId) {
+ hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ String errMsg = null;
+
+ try {
+ fut.get();
+ }
+ catch (Exception e) {
+ errMsg = e.getMessage();
+ }
+
+ try {
+ IndexOperationStatusResponse resp =
+ new IndexOperationStatusResponse(ctx.localNodeId(), hnd.operation().operationId(), errMsg);
+
+ // TODO: Proper pool!
+ ctx.io().sendToGridTopic(nodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ // Node left, ignore.
+ // TODO: Better logging all over the state and handler to simplify debug!
+ }
+ }
+ });
+ }
+
+ /**
+ * Find current coordinator.
+ *
+ * @return {@code True} if node is coordinator.
+ */
+ private boolean isLocalCoordinator() {
+ ClusterNode res = null;
+
+ for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+ if (res == null || res.order() > node.order())
+ res = node;
+ }
+
+ assert res != null; // Operation state can only exist on server nodes.
+
+ return F.eq(ctx.localNodeId(), res.id());
+ }
+
+ /**
+ * Check if operation finished.
+ */
+ private void checkFinished() {
+ assert Thread.holdsLock(mux);
+ assert crd;
+
+ if (nodeIds.size() == nodeRess.size()) {
+ // Initiate finish request.
+ UUID errNodeId = null;
+ String errNodeMsg = null;
+
+ for (Map.Entry<UUID, String> nodeRes : nodeRess.entrySet()) {
+ if (nodeRes.getValue() != null) {
+ errNodeId = nodeRes.getKey();
+ errNodeMsg = nodeRes.getValue();
+
+ break;
+ }
+ }
+
+ IndexFinishDiscoveryMessage msg = new IndexFinishDiscoveryMessage(hnd.operation(), errNodeId, errNodeMsg);
+
+ try {
+ ctx.discovery().sendCustomEvent(msg);
+ }
+ catch (Exception e) {
+ // Failed to send finish message over discovery. This is something unrecoverable.
+ U.warn(log, "Failed to send index finish discovery message [op=" + hnd.operation() + ']', e);
+ }
+ }
+ }
+}
\ No newline at end of file