You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 14:34:56 UTC
ignite git commit: WIP on state handling.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 4cc2b606b -> a45e24ed7
WIP on state handling.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a45e24ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a45e24ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a45e24ed
Branch: refs/heads/ignite-4565-ddl
Commit: a45e24ed715e530c5cf8c723ce0950a32e61d92c
Parents: 4cc2b60
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 17:34:47 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 17:34:47 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 19 +++++---
.../processors/query/QueryIndexStates.java | 47 +++++++++++++-------
2 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2df2ab6..5a961fe 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1173,6 +1173,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
mgr.stop(cancel);
}
+ // TODO: Make sure to notify query client futures.
ctx.kernalContext().query().onCacheStop(ctx);
ctx.kernalContext().continuous().onCacheStop(ctx);
@@ -2702,7 +2703,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
else if (msg instanceof IndexAcceptDiscoveryMessage)
onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
else if (msg instanceof IndexFinishDiscoveryMessage)
- onIndexfinishMessage((IndexFinishDiscoveryMessage)msg);
+ onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
else
U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
}
@@ -2750,7 +2751,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (idxStates == null)
idxStates = new QueryIndexStates();
- if (idxStates.propose(ctx.localNodeId(), msg))
+ if (idxStates.propose(locNodeId, msg))
desc.indexStates(idxStates);
}
@@ -2760,11 +2761,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param msg Message.
*/
private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
- // TODO: Remove init operation from descriptor!
+ AbstractIndexOperation op = msg.operation();
+
+ DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+ if (desc == null)
+ return;
- // TODO: Handle concurrent cache stop!
+ QueryIndexStates idxStates = desc.indexStates();
- // TODO: Enlist cache operation to descriptor!
+ if (idxStates == null || !idxStates.accept(msg))
+ return;
// TODO: Initiate exchange-like routine!
}
@@ -2774,7 +2781,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @param msg Message.
*/
- private void onIndexfinishMessage(IndexFinishDiscoveryMessage msg) {
+ private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
// TODO: Clear dynamic descriptors!
// TODO: Delegate to indexing to handle result and complete client futures!
http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index 13b1525..b775dc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -72,49 +72,62 @@ public class QueryIndexStates implements Serializable {
* Process accept message propagating index from proposed to accepted state.
*
* @param msg Message.
+ * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start.
*/
- public void accept(IndexAcceptDiscoveryMessage msg) {
+ public boolean accept(IndexAcceptDiscoveryMessage msg) {
AbstractIndexOperation op = msg.operation();
String idxName = op.indexName();
QueryIndexActiveOperation curOp = activeOps.get(idxName);
- assert curOp != null && !curOp.accepted(); // Operation is found and is in proposed ("false") state.
- assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches.
+ if (curOp != null) {
+ if (F.eq(curOp.operation().operationId(), op.operationId())) {
+ assert !curOp.accepted();
- curOp.accept();
+ curOp.accept();
+
+ return true;
+ }
+ }
+
+ return false;
}
/**
* Process finish message.
*
* @param msg Message.
+ * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start.
*/
- @SuppressWarnings("ConstantConditions")
- public void finish(IndexFinishDiscoveryMessage msg) {
+ public boolean finish(IndexFinishDiscoveryMessage msg) {
AbstractIndexOperation op = msg.operation();
String idxName = op.indexName();
QueryIndexActiveOperation curOp = activeOps.remove(idxName);
- assert curOp != null; // Operation is found.
- assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches.
+ if (curOp != null) {
+ if (F.eq(curOp.operation().operationId(), op.operationId())) {
+ if (!msg.hasError()) {
+ QueryIndexState state;
- if (!msg.hasError()) {
- QueryIndexState state;
+ if (op instanceof CreateIndexOperation)
+ state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index());
+ else {
+ assert op instanceof DropIndexOperation;
- if (op instanceof CreateIndexOperation)
- state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index());
- else {
- assert op instanceof DropIndexOperation;
+ state = new QueryIndexState(idxName, null);
+ }
- state = new QueryIndexState(idxName, null);
- }
+ readyOps.put(idxName, state);
+ }
- readyOps.put(idxName, state);
+ return true;
+ }
}
+
+ return false;
}
/**