You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 11:45:01 UTC
[1/2] ignite git commit: Added initial index state notion and
populated it on cache add events.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 2f5706dee -> 4376670bb
Added initial index state notion and populated it on cache add events.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7bf1b73
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7bf1b73
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7bf1b73
Branch: refs/heads/ignite-4565-ddl
Commit: d7bf1b73d00dc38b02a84cfb9096f4d75762a681
Parents: 2f5706d
Author: devozerov <vo...@gridgain.com>
Authored: Thu Mar 16 16:53:00 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Mar 16 16:53:00 2017 +0300
----------------------------------------------------------------------
.../processors/cache/DynamicCacheDescriptor.java | 19 ++++++++++++++++++-
.../processors/cache/GridCacheProcessor.java | 3 ++-
2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7bf1b73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index f628e76..a90fb72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -82,6 +82,9 @@ public class DynamicCacheDescriptor {
/** */
private AffinityTopologyVersion rcvdFromVer;
+ /** Initial dynamic index state as observed from cache processor start() method and discovery join process. */
+ private QueryIndexStates initIdxStates;
+
/** Dynamic index states. */
private QueryIndexStates idxStates;
@@ -309,6 +312,20 @@ public class DynamicCacheDescriptor {
/**
* @return Index states.
*/
+ public QueryIndexStates initialIndexStates() {
+ return initIdxStates;
+ }
+
+ /**
+ * @param initIdxStates Index states.
+ */
+ public void initialIndexStates(QueryIndexStates initIdxStates) {
+ this.initIdxStates = initIdxStates != null ? initIdxStates.copy() : null;
+ }
+
+ /**
+ * @return Index states.
+ */
public QueryIndexStates indexStates() {
return idxStates;
}
@@ -317,7 +334,7 @@ public class DynamicCacheDescriptor {
* @param idxStates Index states.
*/
public void indexStates(QueryIndexStates idxStates) {
- this.idxStates = idxStates;
+ this.idxStates = idxStates != null ? idxStates.copy() : null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d7bf1b73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 28cd0a6..8b8f5c7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2120,7 +2120,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (existing != null) {
if (joiningNodeId.equals(ctx.localNodeId())) {
existing.receivedFrom(req.receivedFrom());
-
existing.deploymentId(req.deploymentId());
}
@@ -2146,6 +2145,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
false,
req.deploymentId());
+ desc.initialIndexStates(req.indexStates());
desc.indexStates(req.indexStates());
// Received statically configured cache.
@@ -2858,6 +2858,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor startDesc =
new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+ startDesc.initialIndexStates(req.indexStates());
startDesc.indexStates(req.indexStates());
if (newTopVer == null) {
[2/2] ignite git commit: WIP on state concurrency.
Posted by vo...@apache.org.
WIP on state concurrency.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4376670b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4376670b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4376670b
Branch: refs/heads/ignite-4565-ddl
Commit: 4376670bbf6354deafbd787dca10d8cbe26fc7e5
Parents: d7bf1b7
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 14:44:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 14:44:52 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 2 +-
.../cache/DynamicCacheChangeRequest.java | 2 +-
.../cache/DynamicCacheDescriptor.java | 120 +++++++++++++++++--
.../processors/cache/GridCacheProcessor.java | 64 +++++-----
.../processors/query/QueryIndexStates.java | 7 +-
.../ddl/IndexAbstractDiscoveryMessage.java | 17 +++
6 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d2b09a8..0ce8718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -352,7 +352,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
false,
req.deploymentId());
- desc.indexStates(req.indexStates());
+ desc.tryUpdateFromDiscovery(req.indexStates());
DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index bd2ea17..3610a60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -301,7 +301,7 @@ public class DynamicCacheChangeRequest implements Serializable {
* @param idxStates Index states.
*/
public void indexStates(QueryIndexStates idxStates) {
- this.idxStates = idxStates;
+ this.idxStates = idxStates != null ? idxStates.copy() : null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index a90fb72..a6de0ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -25,10 +25,15 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.QueryIndexStates;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -82,10 +87,16 @@ public class DynamicCacheDescriptor {
/** */
private AffinityTopologyVersion rcvdFromVer;
- /** Initial dynamic index state as observed from cache processor start() method and discovery join process. */
- private QueryIndexStates initIdxStates;
+ /** Mutex to control index states. */
+ private final Object idxStatesMux = new Object();
- /** Dynamic index states. */
+ /** Initial index states which is used to start cache. */
+ private QueryIndexStates idxStatesForStart;
+
+ /** Whether index states for start is fixed. */
+ private boolean idxStatesForStartFixed;
+
+ /** Current index states. */
private QueryIndexStates idxStates;
/**
@@ -310,31 +321,114 @@ public class DynamicCacheDescriptor {
}
/**
+ * Get index state for cache start. Once requested it never changes afterwards.
+ *
+ * @return Index states for cache start.
+ */
+ public QueryIndexStates indexStatesForStart() {
+ synchronized (idxStatesMux) {
+ if (!idxStatesForStartFixed) {
+ idxStatesForStart = idxStates;
+
+ idxStatesForStartFixed = true;
+ }
+
+ return idxStatesForStart != null ? idxStatesForStart.copy() : null;
+ }
+ }
+
+ /**
* @return Index states.
*/
- public QueryIndexStates initialIndexStates() {
- return initIdxStates;
+ public QueryIndexStates indexStates() {
+ synchronized (idxStatesMux) {
+ return idxStates != null ? idxStates.copy() : null;
+ }
}
/**
- * @param initIdxStates Index states.
+ * Try updating index state from discovery thread. If start state is not fixed yet, update will succeed and return
+ * {@code true}.
+ *
+ * @param idxStates Index states.
*/
- public void initialIndexStates(QueryIndexStates initIdxStates) {
- this.initIdxStates = initIdxStates != null ? initIdxStates.copy() : null;
+ public void tryUpdateFromDiscovery(QueryIndexStates idxStates) {
+ synchronized (idxStatesMux) {
+ if (!idxStatesForStartFixed)
+ this.idxStates = idxStates != null ? idxStates.copy() : null;
+ }
}
/**
- * @return Index states.
+ * Try performing propose from discovery thread.
+ *
+ * @param locNodeId Local node ID.
+ * @param msg Message.
*/
- public QueryIndexStates indexStates() {
- return idxStates;
+ public void tryProposeFromDiscoveryThread(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
+ synchronized (idxStatesMux) {
+ if (idxStates == null)
+ idxStates = new QueryIndexStates();
+
+ idxStates.propose(locNodeId, msg);
+ }
}
/**
+ * Try applying accept message.
+ *
+ * @param msg Message.
+ * @param disco Whether call is performed from discovery thread.
+ * @return Result.
+ */
+ public boolean tryAccept(IndexAcceptDiscoveryMessage msg, boolean disco) {
+ synchronized (idxStatesMux) {
+ if (disco && idxStatesForStartFixed) {
+ msg.exchange(true);
+
+ return false;
+ }
+
+ if (idxStates == null)
+ idxStates = new QueryIndexStates();
+
+ return idxStates.accept(msg);
+ }
+ }
+
+ /**
+ * Try applying finish message.
+ *
+ * @param msg Message.
+ * @param disco Whether call is performed from discovery thread.
+ * @return Result.
+ */
+ public boolean tryFinish(IndexFinishDiscoveryMessage msg, boolean disco) {
+ synchronized (idxStatesMux) {
+ if (disco && idxStatesForStartFixed) {
+ msg.exchange(true);
+
+ return false;
+ }
+
+ if (idxStates == null)
+ idxStates = new QueryIndexStates();
+
+ return idxStates.finish(msg);
+ }
+ }
+
+ /**
+ * Forcefully update index states from exchange thread.
+ *
* @param idxStates Index states.
*/
- public void indexStates(QueryIndexStates idxStates) {
- this.idxStates = idxStates != null ? idxStates.copy() : null;
+ public void updateIndexStatesFromExchange(QueryIndexStates idxStates) {
+ synchronized (idxStatesMux) {
+ assert idxStatesForStartFixed;
+
+ this.idxStates = idxStates != null ? idxStates.copy() : null;
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8b8f5c7..8ae3283 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -825,7 +825,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
caches.put(maskNull(name), cache);
- startCache(cache, desc.indexStates());
+ startCache(cache, desc.indexStatesForStart());
jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
}
@@ -1698,7 +1698,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
null,
desc.deploymentId(),
topVer,
- desc.indexStates()
+ desc.indexStatesForStart()
);
}
}
@@ -2133,7 +2133,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.getCacheMode());
}
- existing.indexStates(req.indexStates());
+ existing.tryUpdateFromDiscovery(req.indexStates());
}
else {
assert req.cacheType() != null : req;
@@ -2145,8 +2145,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
false,
req.deploymentId());
- desc.initialIndexStates(req.indexStates());
- desc.indexStates(req.indexStates());
+ desc.tryUpdateFromDiscovery(req.indexStates());
// Received statically configured cache.
if (req.initiatingNodeId() == null)
@@ -2695,21 +2694,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgniteUuid id = msg0.id();
- boolean res = idxDiscoMsgIdHist.add(id);
-
- if (!idxDiscoMsgIdHist.add(id))
+ if (!idxDiscoMsgIdHist.add(id)) {
U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg);
- else {
- if (msg instanceof IndexProposeDiscoveryMessage)
- onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
- else if (msg instanceof IndexAcceptDiscoveryMessage)
- onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
- else if (msg instanceof IndexFinishDiscoveryMessage)
- onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
- else
- U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+
+ return false;
}
+ if (msg instanceof IndexProposeDiscoveryMessage)
+ onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+ else if (msg instanceof IndexAcceptDiscoveryMessage)
+ onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+ else if (msg instanceof IndexFinishDiscoveryMessage)
+ onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
+ else
+ U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
+
return false;
}
@@ -2741,20 +2740,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = cacheDescriptor(op.space());
if (desc == null) {
- msg.onError(locNodeId, "Cache doesn't exit on node [cacheName=" + op.space() +
- ". nodeId=" + locNodeId + ']');
+ msg.onError(locNodeId, "Cache doesn't exit [cacheName=" + op.space() + ", nodeId=" + locNodeId + ']');
return;
}
- // Validate request at descriptor level.
- QueryIndexStates idxStates = desc.indexStates();
-
- if (idxStates == null)
- idxStates = new QueryIndexStates();
-
- if (idxStates.propose(locNodeId, msg))
- desc.indexStates(idxStates);
+ desc.tryProposeFromDiscoveryThread(locNodeId, msg);
}
/**
@@ -2770,12 +2761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc == null)
return;
- QueryIndexStates idxStates = desc.indexStates();
-
- if (idxStates == null || !idxStates.accept(msg))
- return;
-
- ctx.query().onIndexAccept(op.space(), op);
+ desc.tryAccept(msg, true);
}
/**
@@ -2784,9 +2770,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param msg Message.
*/
private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
- // TODO: Clear dynamic descriptors!
+ AbstractIndexOperation op = msg.operation();
+
+ DynamicCacheDescriptor desc = cacheDescriptor(op.space());
+
+ if (desc == null)
+ return;
- // TODO: Delegate to indexing to handle result and complete client futures!
+ desc.tryFinish(msg, true);
}
/**
@@ -2858,8 +2849,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor startDesc =
new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
- startDesc.initialIndexStates(req.indexStates());
- startDesc.indexStates(req.indexStates());
+ startDesc.tryUpdateFromDiscovery(req.indexStates());
if (newTopVer == null) {
newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index 7e8e699..ea36c77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -79,9 +79,8 @@ public class QueryIndexStates implements Serializable {
*
* @param locNodeId Local node ID.
* @param msg Propose message.
- * @return {@code True} if propose succeeded.
*/
- public boolean propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
+ public void propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
synchronized (mux) {
AbstractIndexOperation op = msg.operation();
@@ -90,13 +89,9 @@ public class QueryIndexStates implements Serializable {
if (activeOps.containsKey(idxName)) {
msg.onError(locNodeId, "Failed to initiate index create/drop because another operation on the same " +
"index is in progress: " + idxName);
-
- return false;
}
activeOps.put(idxName, new QueryIndexActiveOperation(op));
-
- return true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index c362b91..11d8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -38,6 +38,9 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
@GridToStringInclude
protected final AbstractIndexOperation op;
+ /** Whether request must be propagated to exchange worker for final processing. */
+ private transient boolean exchange;
+
/** Local cache index state at the moment of message receive. */
private transient QueryIndexStates idxStates;
@@ -76,6 +79,20 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
this.idxStates = idxStates;
}
+ /**
+ * @return Whether request must be propagated to exchange worker for final processing.
+ */
+ public boolean exchange() {
+ return exchange;
+ }
+
+ /**
+ * @param exchange Whether request must be propagated to exchange worker for final processing.
+ */
+ public void exchange(boolean exchange) {
+ this.exchange = exchange;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IndexAbstractDiscoveryMessage.class, this);