You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 11:45:02 UTC

[2/2] ignite git commit: WIP on state concurrency.

WIP on state concurrency.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4376670b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4376670b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4376670b

Branch: refs/heads/ignite-4565-ddl
Commit: 4376670bbf6354deafbd787dca10d8cbe26fc7e5
Parents: d7bf1b7
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 14:44:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 14:44:52 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../cache/DynamicCacheChangeRequest.java        |   2 +-
 .../cache/DynamicCacheDescriptor.java           | 120 +++++++++++++++++--
 .../processors/cache/GridCacheProcessor.java    |  64 +++++-----
 .../processors/query/QueryIndexStates.java      |   7 +-
 .../ddl/IndexAbstractDiscoveryMessage.java      |  17 +++
 6 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d2b09a8..0ce8718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -352,7 +352,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     false,
                     req.deploymentId());
 
-                desc.indexStates(req.indexStates());
+                desc.tryUpdateFromDiscovery(req.indexStates());
 
                 DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index bd2ea17..3610a60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -301,7 +301,7 @@ public class DynamicCacheChangeRequest implements Serializable {
      * @param idxStates Index states.
      */
     public void indexStates(QueryIndexStates idxStates) {
-        this.idxStates = idxStates;
+        this.idxStates = idxStates != null ? idxStates.copy() : null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index a90fb72..a6de0ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -25,10 +25,15 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.QueryIndexStates;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -82,10 +87,16 @@ public class DynamicCacheDescriptor {
     /** */
     private AffinityTopologyVersion rcvdFromVer;
 
-    /** Initial dynamic index state as observed from cache processor start() method and discovery join process. */
-    private QueryIndexStates initIdxStates;
+    /** Mutex to control index states. */
+    private final Object idxStatesMux = new Object();
 
-    /** Dynamic index states. */
+    /** Initial index states which is used to start cache. */
+    private QueryIndexStates idxStatesForStart;
+
+    /** Whether index states for start is fixed. */
+    private boolean idxStatesForStartFixed;
+
+    /** Current index states. */
     private QueryIndexStates idxStates;
 
     /**
@@ -310,31 +321,114 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * Get index state for cache start. Once requested it never changes afterwards.
+     *
+     * @return Index states for cache start.
+     */
+    public QueryIndexStates indexStatesForStart() {
+        synchronized (idxStatesMux) {
+            if (!idxStatesForStartFixed) {
+                idxStatesForStart = idxStates;
+
+                idxStatesForStartFixed = true;
+            }
+
+            return idxStatesForStart != null ? idxStatesForStart.copy() : null;
+        }
+    }
+
+    /**
      * @return Index states.
      */
-    public QueryIndexStates initialIndexStates() {
-        return initIdxStates;
+    public QueryIndexStates indexStates() {
+        synchronized (idxStatesMux) {
+            return idxStates != null ? idxStates.copy() : null;
+        }
     }
 
     /**
-     * @param initIdxStates Index states.
+     * Try updating index state from discovery thread. If start state is not fixed yet, update will succeed and return
+     * {@code true}.
+     *
+     * @param idxStates Index states.
      */
-    public void initialIndexStates(QueryIndexStates initIdxStates) {
-        this.initIdxStates = initIdxStates != null ? initIdxStates.copy() : null;
+    public void tryUpdateFromDiscovery(QueryIndexStates idxStates) {
+        synchronized (idxStatesMux) {
+            if (!idxStatesForStartFixed)
+                this.idxStates = idxStates != null ? idxStates.copy() : null;
+        }
     }
 
     /**
-     * @return Index states.
+     * Try performing propose from discovery thread.
+     *
+     * @param locNodeId Local node ID.
+     * @param msg Message.
      */
-    public QueryIndexStates indexStates() {
-        return idxStates;
+    public void tryProposeFromDiscoveryThread(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
+        synchronized (idxStatesMux) {
+            if (idxStates == null)
+                idxStates = new QueryIndexStates();
+
+            idxStates.propose(locNodeId, msg);
+        }
     }
 
     /**
+     * Try applying accept message.
+     *
+     * @param msg Message.
+     * @param disco Whether call is performed from discovery thread.
+     * @return Result.
+     */
+    public boolean tryAccept(IndexAcceptDiscoveryMessage msg, boolean disco) {
+        synchronized (idxStatesMux) {
+            if (disco && idxStatesForStartFixed) {
+                msg.exchange(true);
+
+                return false;
+            }
+
+            if (idxStates == null)
+                idxStates = new QueryIndexStates();
+
+            return idxStates.accept(msg);
+        }
+    }
+
+    /**
+     * Try applying finish message.
+     *
+     * @param msg Message.
+     * @param disco Whether call is performed from discovery thread.
+     * @return Result.
+     */
+    public boolean tryFinish(IndexFinishDiscoveryMessage msg, boolean disco) {
+        synchronized (idxStatesMux) {
+            if (disco && idxStatesForStartFixed) {
+                msg.exchange(true);
+
+                return false;
+            }
+
+            if (idxStates == null)
+                idxStates = new QueryIndexStates();
+
+            return idxStates.finish(msg);
+        }
+    }
+
+    /**
+     * Forcefully update index states from exchange thread.
+     *
      * @param idxStates Index states.
      */
-    public void indexStates(QueryIndexStates idxStates) {
-        this.idxStates = idxStates != null ? idxStates.copy() : null;
+    public void updateIndexStatesFromExchange(QueryIndexStates idxStates) {
+        synchronized (idxStatesMux) {
+            assert idxStatesForStartFixed;
+
+            this.idxStates = idxStates != null ? idxStates.copy() : null;
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8b8f5c7..8ae3283 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -825,7 +825,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     caches.put(maskNull(name), cache);
 
-                    startCache(cache, desc.indexStates());
+                    startCache(cache, desc.indexStatesForStart());
 
                     jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
                 }
@@ -1698,7 +1698,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         null,
                         desc.deploymentId(),
                         topVer,
-                        desc.indexStates()
+                        desc.indexStatesForStart()
                     );
                 }
             }
@@ -2133,7 +2133,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                     ccfg.getCacheMode());
                         }
 
-                        existing.indexStates(req.indexStates());
+                        existing.tryUpdateFromDiscovery(req.indexStates());
                     }
                     else {
                         assert req.cacheType() != null : req;
@@ -2145,8 +2145,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                 false,
                                 req.deploymentId());
 
-                        desc.initialIndexStates(req.indexStates());
-                        desc.indexStates(req.indexStates());
+                        desc.tryUpdateFromDiscovery(req.indexStates());
 
                         // Received statically configured cache.
                         if (req.initiatingNodeId() == null)
@@ -2695,21 +2694,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             IgniteUuid id = msg0.id();
 
-            boolean res = idxDiscoMsgIdHist.add(id);
-
-            if (!idxDiscoMsgIdHist.add(id))
+            if (!idxDiscoMsgIdHist.add(id)) {
                 U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
-            else {
-                if (msg instanceof IndexProposeDiscoveryMessage)
-                    onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
-                else if (msg instanceof IndexAcceptDiscoveryMessage)
-                    onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
-                else if (msg instanceof IndexFinishDiscoveryMessage)
-                    onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
-                else
-                    U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+
+                return false;
             }
 
+            if (msg instanceof IndexProposeDiscoveryMessage)
+                onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+            else if (msg instanceof IndexAcceptDiscoveryMessage)
+                onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+            else if (msg instanceof IndexFinishDiscoveryMessage)
+                onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
+            else
+                U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+
             return false;
         }
 
@@ -2741,20 +2740,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         DynamicCacheDescriptor desc = cacheDescriptor(op.space());
 
         if (desc == null) {
-            msg.onError(locNodeId, "Cache doesn't exit on node [cacheName=" + op.space() +
-                ". nodeId=" + locNodeId + ']');
+            msg.onError(locNodeId, "Cache doesn't exit [cacheName=" + op.space() + ", nodeId=" + locNodeId + ']');
 
             return;
         }
 
-        // Validate request at descriptor level.
-        QueryIndexStates idxStates = desc.indexStates();
-
-        if (idxStates == null)
-            idxStates = new QueryIndexStates();
-
-        if (idxStates.propose(locNodeId, msg))
-            desc.indexStates(idxStates);
+        desc.tryProposeFromDiscoveryThread(locNodeId, msg);
     }
 
     /**
@@ -2770,12 +2761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (desc == null)
             return;
 
-        QueryIndexStates idxStates = desc.indexStates();
-
-        if (idxStates == null || !idxStates.accept(msg))
-            return;
-
-        ctx.query().onIndexAccept(op.space(), op);
+        desc.tryAccept(msg, true);
     }
 
     /**
@@ -2784,9 +2770,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param msg Message.
      */
     private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
-        // TODO: Clear dynamic descriptors!
+        AbstractIndexOperation op = msg.operation();
+
+        DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+        if (desc == null)
+            return;
 
-        // TODO: Delegate to indexing to handle result and complete client futures!
+        desc.tryFinish(msg, true);
     }
 
     /**
@@ -2858,8 +2849,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         DynamicCacheDescriptor startDesc =
                             new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
 
-                        startDesc.initialIndexStates(req.indexStates());
-                        startDesc.indexStates(req.indexStates());
+                        startDesc.tryUpdateFromDiscovery(req.indexStates());
 
                         if (newTopVer == null) {
                             newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index 7e8e699..ea36c77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -79,9 +79,8 @@ public class QueryIndexStates implements Serializable {
      *
      * @param locNodeId Local node ID.
      * @param msg Propose message.
-     * @return {@code True} if propose succeeded.
      */
-    public boolean propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
+    public void propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
         synchronized (mux) {
             AbstractIndexOperation op = msg.operation();
 
@@ -90,13 +89,9 @@ public class QueryIndexStates implements Serializable {
             if (activeOps.containsKey(idxName)) {
                 msg.onError(locNodeId, "Failed to initiate index create/drop because another operation on the same " +
                     "index is in progress: " + idxName);
-
-                return false;
             }
 
             activeOps.put(idxName, new QueryIndexActiveOperation(op));
-
-            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index c362b91..11d8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -38,6 +38,9 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     @GridToStringInclude
     protected final AbstractIndexOperation op;
 
+    /** Whether request must be propagated to exchange worker for final processing. */
+    private transient boolean exchange;
+
     /** Local cache index state at the moment of message receive. */
     private transient QueryIndexStates idxStates;
 
@@ -76,6 +79,20 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
         this.idxStates = idxStates;
     }
 
+    /**
+     * @return Whether request must be propagated to exchange worker for final processing.
+     */
+    public boolean exchange() {
+        return exchange;
+    }
+
+    /**
+     * @param exchange Whether request must be propagated to exchange worker for final processing.
+     */
+    public void exchange(boolean exchange) {
+        this.exchange = exchange;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IndexAbstractDiscoveryMessage.class, this);