You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 14:34:56 UTC

ignite git commit: WIP on state handling.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 4cc2b606b -> a45e24ed7


WIP on state handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a45e24ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a45e24ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a45e24ed

Branch: refs/heads/ignite-4565-ddl
Commit: a45e24ed715e530c5cf8c723ce0950a32e61d92c
Parents: 4cc2b60
Author: devozerov <vo...@gridgain.com>
Authored: Tue Mar 14 17:34:47 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Mar 14 17:34:47 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 19 +++++---
 .../processors/query/QueryIndexStates.java      | 47 +++++++++++++-------
 2 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2df2ab6..5a961fe 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1173,6 +1173,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 mgr.stop(cancel);
         }
 
+        // TODO: Make sure to notify query client futures.
         ctx.kernalContext().query().onCacheStop(ctx);
         ctx.kernalContext().continuous().onCacheStop(ctx);
 
@@ -2702,7 +2703,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 else if (msg instanceof IndexAcceptDiscoveryMessage)
                     onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
                 else if (msg instanceof IndexFinishDiscoveryMessage)
-                    onIndexfinishMessage((IndexFinishDiscoveryMessage)msg);
+                    onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
                 else
                     U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
             }
@@ -2750,7 +2751,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (idxStates == null)
             idxStates = new QueryIndexStates();
 
-        if (idxStates.propose(ctx.localNodeId(), msg))
+        if (idxStates.propose(locNodeId, msg))
             desc.indexStates(idxStates);
     }
 
@@ -2760,11 +2761,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param msg Message.
      */
     private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
-        // TODO: Remove init operation from descriptor!
+        AbstractIndexOperation op = msg.operation();
+
+        DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+        if (desc == null)
+            return;
 
-        // TODO: Handle concurrent cache stop!
+        QueryIndexStates idxStates = desc.indexStates();
 
-        // TODO: Enlist cache operation to descriptor!
+        if (idxStates == null || !idxStates.accept(msg))
+            return;
 
         // TODO: Initiate exchange-like routine!
     }
@@ -2774,7 +2781,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param msg Message.
      */
-    private void onIndexfinishMessage(IndexFinishDiscoveryMessage msg) {
+    private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
         // TODO: Clear dynamic descriptors!
 
         // TODO: Delegate to indexing to handle result and complete client futures!

http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index 13b1525..b775dc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -72,49 +72,62 @@ public class QueryIndexStates implements Serializable {
      * Process accept message propagating index from proposed to accepted state.
      *
      * @param msg Message.
+     * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start.
      */
-    public void accept(IndexAcceptDiscoveryMessage msg) {
+    public boolean accept(IndexAcceptDiscoveryMessage msg) {
         AbstractIndexOperation op = msg.operation();
 
         String idxName = op.indexName();
 
         QueryIndexActiveOperation curOp = activeOps.get(idxName);
 
-        assert curOp != null && !curOp.accepted(); // Operation is found and is in proposed ("false") state.
-        assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches.
+        if (curOp != null) {
+            if (F.eq(curOp.operation().operationId(), op.operationId())) {
+                assert !curOp.accepted();
 
-        curOp.accept();
+                curOp.accept();
+
+                return true;
+            }
+        }
+
+        return false;
     }
 
     /**
      * Process finish message.
      *
      * @param msg Message.
+     * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start.
      */
-    @SuppressWarnings("ConstantConditions")
-    public void finish(IndexFinishDiscoveryMessage msg) {
+    public boolean finish(IndexFinishDiscoveryMessage msg) {
         AbstractIndexOperation op = msg.operation();
 
         String idxName = op.indexName();
 
         QueryIndexActiveOperation curOp = activeOps.remove(idxName);
 
-        assert curOp != null; // Operation is found.
-        assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches.
+        if (curOp != null) {
+            if (F.eq(curOp.operation().operationId(), op.operationId())) {
+                if (!msg.hasError()) {
+                    QueryIndexState state;
 
-        if (!msg.hasError()) {
-            QueryIndexState state;
+                    if (op instanceof CreateIndexOperation)
+                        state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index());
+                    else {
+                        assert op instanceof DropIndexOperation;
 
-            if (op instanceof CreateIndexOperation)
-                state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index());
-            else {
-                assert op instanceof DropIndexOperation;
+                        state = new QueryIndexState(idxName, null);
+                    }
 
-                state = new QueryIndexState(idxName, null);
-            }
+                    readyOps.put(idxName, state);
+                }
 
-            readyOps.put(idxName, state);
+                return true;
+            }
         }
+
+        return false;
     }
 
     /**