You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/18 08:34:14 UTC
[2/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int cacheId,
+ public GridDhtAffinityAssignmentResponse(
+ long futId,
+ int cacheId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
}
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
/** {@inheritDoc} */
@Override public boolean partitionExchangeMessage() {
return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
writer.incrementState();
case 4:
- if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 5:
+ if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 4:
- idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 5:
+ idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -32,12 +33,11 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
* Future that fetches affinity assignment from remote cache nodes.
*/
public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -58,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private static IgniteLogger log;
/** */
+ private static final AtomicLong idGen = new AtomicLong();
+
+ /** */
private final GridCacheSharedContext ctx;
/** List of available nodes this future can fetch data from. */
@@ -68,26 +68,33 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private ClusterNode pendingNode;
/** */
- @GridToStringInclude
- private final T2<Integer, AffinityTopologyVersion> key;
+ private final long id;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final int cacheId;
/**
* @param ctx Context.
- * @param cacheName Cache name.
+ * @param cacheDesc Cache descriptor.
* @param topVer Topology version.
* @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
- String cacheName,
+ DynamicCacheDescriptor cacheDesc,
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
+ this.topVer = topVer;
+ this.cacheId = cacheDesc.cacheId();
this.ctx = ctx;
- int cacheId = CU.cacheId(cacheName);
- this.key = new T2<>(cacheId, topVer);
- Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
+ id = idGen.getAndIncrement();
+
+ Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
LinkedList<ClusterNode> tmp = new LinkedList<>();
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
}
/**
- * Initializes fetch future.
+ * @return Cache ID.
*/
- public void init() {
- ctx.affinity().addDhtAssignmentFetchFuture(this);
+ public int cacheId() {
+ return cacheId;
+ }
- requestFromNextNode();
+ /**
+ * @return Future ID.
+ */
+ public long id() {
+ return id;
}
/**
- * @return Future key.
+ * Initializes fetch future.
*/
- public T2<Integer, AffinityTopologyVersion> key() {
- return key;
+ public void init() {
+ ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+ requestFromNextNode();
}
/**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param res Response.
*/
public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- if (!res.topologyVersion().equals(key.get2())) {
- if (log.isDebugEnabled())
- log.debug("Received affinity assignment for wrong topology version (will ignore) " +
- "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
- return;
- }
-
GridDhtAffinityAssignmentResponse res0 = null;
synchronized (this) {
@@ -188,7 +194,8 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() +
", node=" + node + ']');
- ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
+ ctx.io().send(node,
+ new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
AFFINITY_POOL);
// Close window for listener notification.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4e699b3..8e79eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
if (cctx.rebalanceEnabled()) {
- boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+ boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@ -541,7 +541,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
cntrMap.clear();
// If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -1156,10 +1156,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// If for some nodes current partition has a newer map,
// then we keep the newer value.
if (newPart != null &&
- (newPart.updateSequence() < part.updateSequence() || (
- cctx.startTopologyVersion() != null &&
- newPart.topologyVersion() != null && // Backward compatibility.
- cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (newPart.updateSequence() < part.updateSequence() ||
+ (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1169,7 +1167,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- //remove entry if node left
+ // Remove entry if node left.
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 6fb7df6..579796d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -122,6 +122,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
boolean keepBinary,
boolean skipStore
) {
+ assert topVer.topologyVersion() > 0 : topVer;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futId = futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 654d306..b4cb3c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +110,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
- /** */
- private static final long serialVersionUID = 0L;
-
/** Dummy flag. */
private final boolean dummy;
@@ -190,8 +187,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Logger. */
private final IgniteLogger log;
- /** Dynamic cache change requests. */
- private Collection<DynamicCacheChangeRequest> reqs;
+ /** Cache change requests. */
+ private ExchangeActions exchActions;
/** */
private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +281,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change requests.
* @param affChangeMsg Affinity change message.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
CacheAffinityChangeMessage affChangeMsg
) {
assert busyLock != null;
assert exchId != null;
assert exchId.topologyVersion() != null;
+ assert exchActions == null || !exchActions.empty();
dummy = false;
forcePreload = false;
@@ -305,7 +303,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
- this.reqs = reqs;
+ this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
log = cctx.logger(getClass());
@@ -317,10 +315,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @param reqs Cache change requests.
+ * @param exchActions Exchange actions.
*/
- public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
- this.reqs = reqs;
+ public void exchangeActions(ExchangeActions exchActions) {
+ assert exchActions == null || !exchActions.empty() : exchActions;
+ assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+ this.exchActions = exchActions;
}
/**
@@ -379,33 +380,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
- * @param topVer Topology version.
+ * @param rcvdFrom Topology version.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
- if (cacheStarted(cacheId))
- return true;
-
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+ public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
+ return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
}
/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
- public boolean cacheStarted(int cacheId) {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && !req.clientStartOnly()) {
- if (CU.cacheId(req.cacheName()) == cacheId)
- return true;
- }
- }
- }
-
- return false;
+ public boolean dynamicCacheStarted(int cacheId) {
+ return exchActions != null && exchActions.cacheStarted(cacheId);
}
/**
@@ -435,14 +422,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
*/
public ClusterState newClusterState() {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.globalStateChange())
- return req.state();
- }
- }
-
- return null;
+ return exchActions != null ? exchActions.newClusterState() : null;
}
/**
@@ -524,7 +504,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
if (msg instanceof DynamicCacheChangeBatch){
- assert !F.isEmpty(reqs);
+ assert exchActions != null && !exchActions.empty();
exchange = onCacheChangeRequest(crdNode);
}
@@ -540,10 +520,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (discoEvt.type() == EVT_NODE_JOINED) {
- Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
+ if (!discoEvt.eventNode().isLocal()) {
+ Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+ discoEvt.eventNode().id(),
+ topVer);
- if (!discoEvt.eventNode().isLocal())
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+ }
+ else
+ cctx.cache().startCachesOnLocalJoin(topVer);
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -553,20 +538,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
updateTopologies(crdNode);
- if (!F.isEmpty(reqs)) {
- boolean hasStop = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop()) {
- hasStop = true;
-
- break;
- }
- }
-
- if (hasStop)
- cctx.cache().context().database().beforeCachesStop();
- }
+ if (exchActions != null && exchActions.hasStop())
+ cctx.cache().context().database().beforeCachesStop();
switch (exchange) {
case ALL: {
@@ -654,8 +627,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx.topology();
if (crd) {
- boolean updateTop = !cacheCtx.isLocal() &&
- exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
@@ -674,32 +646,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
- assert !F.isEmpty(reqs) : this;
+ assert exchActions != null && !exchActions.empty() : this;
GridClusterStateProcessor stateProc = cctx.kernalContext().state();
- if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+ if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
changeGlobalStateE = stateProc.onChangeGlobalState();
if (crd && changeGlobalStateE != null)
changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
}
- boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
-
- if (clientOnly) {
- boolean clientCacheStarted = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
- clientCacheStarted = true;
+ boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- break;
- }
- }
-
- return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
- }
+ if (clientOnly)
+ return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
else
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
@@ -768,7 +729,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private void clientOnlyExchange() throws IgniteCheckedException {
clientOnlyExchange = true;
- //todo checl invoke on client
if (crd != null) {
if (crd.isLocal()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1046,19 +1006,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @return {@code True} if cache is stopping by this exchange.
*/
public boolean stopping(int cacheId) {
- boolean stopping = false;
-
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (cacheId == CU.cacheId(req.cacheName())) {
- stopping = req.stop();
-
- break;
- }
- }
- }
-
- return stopping;
+ return exchActions != null && exchActions.cacheStopped(cacheId);
}
/**
@@ -1069,13 +1017,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert node != null;
// Reset lost partition before send local partition to coordinator.
- if (!F.isEmpty(reqs)) {
- Set<String> caches = new HashSet<>();
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.resetLostPartitions())
- caches.add(req.cacheName());
- }
+ if (exchActions != null) {
+ Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
resetLostPartitions(caches);
@@ -1203,14 +1146,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheValidRes = m;
}
- cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+ cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
cctx.exchange().onExchangeDone(this, err);
- if (!F.isEmpty(reqs) && err == null) {
- for (DynamicCacheChangeRequest req : reqs)
- cctx.cache().completeStartFuture(req);
- }
+ if (exchActions != null && err == null)
+ exchActions.completeRequestFutures(cctx);
if (exchangeOnChangeGlobalState && err == null)
cctx.kernalContext().state().onExchangeDone();
@@ -1227,7 +1168,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
}
- reqs = null;
+ exchActions = null;
if (discoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)discoEvt).customMessage(null);
@@ -1615,20 +1556,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert discoEvt instanceof DiscoveryCustomEvent;
if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
- .customMessage();
+ if (exchActions != null) {
+ if (exchActions.newClusterState() == ClusterState.ACTIVE)
+ assignPartitionsStates();
- Set<String> caches = new HashSet<>();
+ Set<String> caches = exchActions.cachesToResetLostPartitions();
- for (DynamicCacheChangeRequest req : batch.requests()) {
- if (req.resetLostPartitions())
- caches.add(req.cacheName());
- else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
- assignPartitionsStates();
+ if (!F.isEmpty(caches))
+ resetLostPartitions(caches);
}
-
- if (!F.isEmpty(caches))
- resetLostPartitions(caches);
}
}
else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityAssignment assignment = cctx.affinity().assignment(topVer);
- GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+ req.futureId(),
+ cctx.cacheId(),
topVer,
assignment.assignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 5e3dc3f..8b538ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -146,35 +146,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
try {
- for (KeyCacheObject key : keys) {
- while (true) {
- GridLocalCacheEntry entry = null;
-
- try {
- entry = entryExx(key);
-
- entry.unswap(false);
-
- if (!ctx.isAll(entry, filter)) {
- fut.onFailed();
-
- return fut;
- }
-
- // Removed exception may be thrown here.
- GridCacheMvccCandidate cand = fut.addEntry(entry);
-
- if (cand == null && fut.isDone())
- return fut;
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log().isDebugEnabled())
- log().debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- }
- }
- }
+ if (!fut.addEntries(keys))
+ return fut;
if (!ctx.mvcc().addFuture(fut))
fut.onError(new IgniteCheckedException("Duplicate future ID (internal error): " + fut));
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 59d0adb..9641533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -144,12 +144,51 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
+ }
+
+ /**
+ * @param keys Keys.
+ * @return {@code False} in case of error.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
+ for (KeyCacheObject key : keys) {
+ while (true) {
+ GridLocalCacheEntry entry = null;
+
+ try {
+ entry = cache.entryExx(key);
+
+ entry.unswap(false);
+
+ if (!cctx.isAll(entry, filter)) {
+ onFailed();
+
+ return false;
+ }
+
+ // Removed exception may be thrown here.
+ GridCacheMvccCandidate cand = addEntry(entry);
+
+ if (cand == null && isDone())
+ return false;
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ }
+ }
+ }
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
}
+
+ return true;
}
/** {@inheritDoc} */
@@ -216,7 +255,7 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
+ private @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
throws GridCacheEntryRemovedException {
// Add local lock first, as it may throw GridCacheEntryRemovedException.
GridCacheMvccCandidate c = entry.addLocal(
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0f7b0df..fcf534c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -272,8 +272,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
qryTopVer = cctx.startTopologyVersion();
- if (qryTopVer == null)
- qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
+ assert qryTopVer != null : cctx.name();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..e7706dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -561,16 +561,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
- * Wait topology.
+ * @param ctx Context.
+ * @throws IgniteCheckedException In case of error.
*/
- public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+ void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (!cctx.isLocal()) {
- cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+ AffinityTopologyVersion topVer = initTopVer;
+
+ cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
- getOrCreatePartitionRecovery(ctx, partId);
+ getOrCreatePartitionRecovery(ctx, partId, topVer);
}
}
@@ -736,7 +739,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
}
- PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+ PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
return rec.collectEntries(e, cctx, cache);
}
@@ -869,37 +872,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* @param ctx Context.
* @param partId Partition id.
+ * @param topVer Topology version for current operation.
* @return Partition recovery.
*/
- @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+ @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+ int partId,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
PartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
T2<Long, Long> partCntrs = null;
- AffinityTopologyVersion initTopVer0 = initTopVer;
+ Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
- if (initTopVer0 != null) {
+ if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
GridCacheAffinityManager aff = cctx.affinity();
- if (initUpdCntrsPerNode != null) {
- for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
- Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+ for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+ Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
- if (map != null) {
- partCntrs = map.get(partId);
+ if (map != null) {
+ partCntrs = map.get(partId);
- break;
- }
+ break;
}
}
- else if (initUpdCntrs != null)
- partCntrs = initUpdCntrs.get(partId);
}
+ else if (initUpdCntrs != null)
+ partCntrs = initUpdCntrs.get(partId);
- rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+ rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);
PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8377754..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -340,8 +341,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
updateCntr,
topVer);
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+ IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
+
+ assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
+ ", locStart=" + cctx.startTopologyVersion() +
+ ", locNode=" + cctx.localNode() +
+ ", stopping=" + cctx.kernalContext().isStopping();
+
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0);
lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..b25b229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -279,9 +280,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
- requestId, null, ctx.localNodeId());
-
- changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+ requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
reqs.add(changeGlobalStateReq);
@@ -329,26 +328,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- * @param reqs Requests.
+ * @param exchActions Requests.
+ * @param topVer Exchange topology version.
*/
public boolean changeGlobalState(
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
AffinityTopologyVersion topVer
) {
- assert !F.isEmpty(reqs);
+ assert exchActions != null;
assert topVer != null;
- for (DynamicCacheChangeRequest req : reqs)
- if (req.globalStateChange()) {
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
- assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+ if (exchActions.newClusterState() != null) {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
- cgsCtx.topologyVersion(topVer);
+ assert cgsCtx != null : exchActions;
- return true;
- }
+ cgsCtx.topologyVersion(topVer);
+ return true;
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 81f5c28..59c2656 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -427,7 +427,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
*/
protected final void unregisterMBean() throws IgniteSpiException {
// Unregister SPI MBean.
- if (spiMBean != null) {
+ if (spiMBean != null && ignite != null) {
MBeanServer jmx = ignite.configuration().getMBeanServer();
assert jmx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..803beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -188,14 +188,15 @@ public class DiscoveryDataBag {
}
/**
- *
+ * @return ID of joining node.
*/
public UUID joiningNodeId() {
return joiningNodeId;
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return Discovery data for given component.
*/
public GridDiscoveryData gridDiscoveryData(int cmpId) {
if (gridData == null)
@@ -207,7 +208,8 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return Joining node discovery data.
*/
public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
if (newJoinerData == null)
@@ -219,7 +221,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addJoiningNodeData(Integer cmpId, Serializable data) {
@@ -227,7 +229,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addGridCommonData(Integer cmpId, Serializable data) {
@@ -235,7 +237,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addNodeSpecificData(Integer cmpId, Serializable data) {
@@ -246,7 +248,8 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return {@code True} if common data collected for given component.
*/
public boolean commonDataCollectedFor(Integer cmpId) {
assert cmnDataInitializedCmps != null;
@@ -295,5 +298,4 @@ public class DiscoveryDataBag {
@Nullable public Map<Integer, Serializable> localNodeSpecificData() {
return nodeSpecificData.get(DEFAULT_KEY);
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index cc581e1..2a55412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1981,7 +1981,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
*
*/
void printStopInfo() {
- if (log.isDebugEnabled())
+ IgniteLogger log = this.log;
+
+ if (log != null && log.isDebugEnabled())
log.debug(stopInfo());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/config/examples.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/examples.properties b/modules/core/src/test/config/examples.properties
index c584f72..ea0d8ed 100644
--- a/modules/core/src/test/config/examples.properties
+++ b/modules/core/src/test/config/examples.properties
@@ -22,3 +22,4 @@ ScalarCacheExample=examples/config/example-ignite.xml
ScalarCacheQueryExample=examples/config/example-ignite.xml
ScalarCountGraphTrianglesExample=examples/config/example-ignite.xml
ScalarPopularNumbersRealTimeExample=examples/config/example-ignite.xml
+MemoryPolicyExample=examples/config/example-memory-policies.xml
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 2110c28..99e80ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -78,13 +78,13 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
- String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME);
+ String nodeAttributeVal = node.attribute(SPLIT_ATTRIBUTE_NAME);
- if (FIRST_NODE_GROUP.equals(nodeAttributeValue)
+ if (FIRST_NODE_GROUP.equals(nodeAttributeVal)
&& backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
return true;
- return backupAssignedAttribute.get(nodeAttributeValue).equals(0);
+ return backupAssignedAttribute.get(nodeAttributeVal).equals(0);
}
};
@@ -107,10 +107,11 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
- Integer count = backupAssignedAttribute.get(val);
+ Integer cnt = backupAssignedAttribute.get(val);
- backupAssignedAttribute.put(val, count + 1);
+ backupAssignedAttribute.put(val, cnt + 1);
}
+
return backupAssignedAttribute;
}
@@ -157,6 +158,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
*/
public void testPartitionDistribution() throws Exception {
backups = 1;
+
try {
for (int i = 0; i < 3; i++) {
splitAttrVal = "A";
@@ -205,6 +207,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
*/
public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
backups = 3;
+
try {
for (int i = 0; i < 2; i++) {
splitAttrVal = FIRST_NODE_GROUP;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index 13fae24..b6114d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -35,12 +35,6 @@ import org.apache.log4j.WriterAppender;
@SuppressWarnings({"ProhibitedExceptionDeclared"})
@GridCommonTest(group = "Kernal")
public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
- /** */
-
- public GridNodeMetricsLogSelfTest() {
- super(false);
- }
-
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -51,6 +45,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
@@ -80,11 +75,11 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
cache2.put(2, "two");
- Thread.sleep(10000);
+ Thread.sleep(10_000);
//Check that nodes are alie
- assert cache1.get(1).equals("one");
- assert cache2.get(2).equals("two");
+ assertEquals("one", cache1.get(1));
+ assertEquals("two", cache2.get(2));
String fullLog = strWr.toString();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index 7dce36b..ae9986d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
+import javax.cache.configuration.Factory;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.integration.CacheLoaderException;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -169,18 +171,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
cacheCfg.setRebalanceMode(SYNC);
- if (igniteInstanceName.endsWith("1"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_1));
- else if (igniteInstanceName.endsWith("2"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_2));
- else if (igniteInstanceName.endsWith("3"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_3));
- else if (igniteInstanceName.endsWith("4"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_4));
- else if (igniteInstanceName.endsWith("5"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_5));
- else
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_6));
+ cacheCfg.setCacheStoreFactory(new StoreFactory());
cacheCfg.setWriteThrough(true);
cacheCfg.setReadThrough(true);
@@ -840,4 +831,30 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
map.clear();
}
}
+
+ /**
+ *
+ */
+ static class StoreFactory implements Factory<CacheStore> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite node;
+
+ @Override public CacheStore create() {
+ String igniteInstanceName = node.configuration().getIgniteInstanceName();
+
+ if (igniteInstanceName.endsWith("1"))
+ return LOCAL_STORE_1;
+ else if (igniteInstanceName.endsWith("2"))
+ return LOCAL_STORE_2;
+ else if (igniteInstanceName.endsWith("3"))
+ return LOCAL_STORE_3;
+ else if (igniteInstanceName.endsWith("4"))
+ return LOCAL_STORE_4;
+ else if (igniteInstanceName.endsWith("5"))
+ return LOCAL_STORE_5;
+ else
+ return LOCAL_STORE_6;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
index a4a831f..546ec06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -44,13 +44,13 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
/** {@inheritDoc} */
@Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
- //GridCacheEvictionRequest unmarshalling failed test
- readCnt.set(5); //2 for each put
+ //GridCacheEvictionRequest unmarshalling failed test.
+ readCnt.set(5); //2 for each put.
jcache(0).put(new TestKey(String.valueOf(++key)), "");
jcache(0).put(new TestKey(String.valueOf(++key)), "");
- //Eviction request unmarshalling failed but ioManager does not hangs up.
+ // Eviction request unmarshalling failed but ioManager does not hangs up.
// Wait for eviction complete.
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..eb8077f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "c1";
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null)
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testStartAndNodeJoin() throws Exception {
+ Ignite node0 = startGrid(0);
+
+ checkCache(0, CACHE_NAME, false);
+
+ node0.createCache(cacheConfiguration(CACHE_NAME));
+
+ checkCache(0, CACHE_NAME, true);
+
+ startGrid(1);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+
+ client = true;
+
+ startGrid(2);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+
+ ignite(2).destroyCache(CACHE_NAME);
+
+ checkCache(0, CACHE_NAME, false);
+ checkCache(1, CACHE_NAME, false);
+ checkCache(2, CACHE_NAME, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode1() throws Exception {
+ checkStartFromJoiningNode(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode2() throws Exception {
+ checkStartFromJoiningNode(true);
+ }
+
+ /**
+ * @param joinClient {@code True} if client node joins.
+ * @throws Exception If failed.
+ */
+ private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+ startGrid(0);
+ startGrid(1);
+
+ client = true;
+
+ startGrid(2);
+
+ ccfg = cacheConfiguration(CACHE_NAME);
+ client = joinClient;
+
+ startGrid(3);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+
+ client = false;
+ ccfg = null;
+
+ startGrid(4);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+ checkCache(4, CACHE_NAME, true);
+
+ client = true;
+
+ startGrid(5);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+ checkCache(4, CACHE_NAME, true);
+ checkCache(5, CACHE_NAME, false);
+
+ ignite(5).destroyCache(CACHE_NAME);
+
+ for (int i = 0; i < 5; i++)
+ checkCache(i, CACHE_NAME, false);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ return new CacheConfiguration(cacheName);
+ }
+
+ /**
+ * @param idx Node index.
+ * @param cacheName Cache name.
+ * @param expCache {@code True} if cache should be created.
+ */
+ private void checkCache(int idx, final String cacheName, final boolean expCache) throws IgniteInterruptedCheckedException {
+ final IgniteKernal node = (IgniteKernal)ignite(idx);
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expCache == (node.context().cache().cache(cacheName) != null);
+ }
+ }, 1000));
+
+ assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @param nearOnly Near only flag.
* @throws Exception If failed.
*/
- public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+ private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
try {
final AtomicInteger cnt = new AtomicInteger(nodeCount());
final AtomicReference<Throwable> err = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index fd77309..057b0d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -130,7 +130,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/**
* Tests topology split scenario.
- * @throws Exception
+ *
+ * @throws Exception If failed.
*/
public void testTopologyValidator() throws Exception {
assertTrue(initLatch.await(10, TimeUnit.SECONDS));
@@ -242,12 +243,15 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/** */
private static final long serialVersionUID = 0L;
+ /** */
@CacheNameResource
private String cacheName;
+ /** */
@IgniteInstanceResource
private Ignite ignite;
+ /** */
@LoggerResource
private IgniteLogger log;
@@ -263,7 +267,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
}).isEmpty())
return false;
- IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class);
+ IgniteKernal kernal = (IgniteKernal)ignite;
GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
new file mode 100644
index 0000000..a80830a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridAtomicInteger;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheDiscoveryDataConcurrentJoinTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Iteration. */
+ private static final int ITERATIONS = 3;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private ThreadLocal<Integer> staticCaches = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+ /** */
+ private boolean delay = true;
+
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (getTestIgniteInstanceName(0).equals(ignite.name())) {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+ if (delay) {
+ log.info("Delay join processing: " + msg0);
+
+ delay = false;
+
+ doSleep(5000);
+ }
+ }
+ }
+
+ super.startMessageProcess(msg);
+ }
+ };
+
+ testSpi.setIpFinder(ipFinder);
+ testSpi.setJoinTimeout(60_000);
+
+ cfg.setDiscoverySpi(testSpi);
+
+ cfg.setClientMode(client);
+
+ Integer caches = staticCaches.get();
+
+ if (caches != null) {
+ cfg.setCacheConfiguration(cacheConfigurations(caches).toArray(new CacheConfiguration[caches]));
+
+ staticCaches.remove();
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60 * 1000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentJoin() throws Exception {
+ for (int iter = 0; iter < ITERATIONS; iter++) {
+ log.info("Iteration: " + iter);
+
+ final int NODES = 6;
+ final int MAX_CACHES = 10;
+
+ final GridAtomicInteger caches = new GridAtomicInteger();
+
+ startGrid(0);
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int c = ThreadLocalRandom.current().nextInt(MAX_CACHES) + 1;
+
+ staticCaches.set(c);
+
+ startGrid(idx.getAndIncrement());
+
+ caches.setIfGreater(c);
+
+ return null;
+ }
+ }, NODES - 1, "start-node");
+
+ assertTrue(caches.get() > 0);
+
+ for (int i = 0; i < NODES; i++) {
+ Ignite node = ignite(i);
+
+ for (int c = 0; c < caches.get(); c++) {
+ Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
+
+ assertEquals(NODES, nodes.size());
+
+ checkCache(node, "cache-" + c);
+ }
+ }
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param caches Number of caches.
+ * @return Cache configurations.
+ */
+ private Collection<CacheConfiguration> cacheConfigurations(int caches) {
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ for (int i = 0; i < caches; i++)
+ ccfgs.add(cacheConfiguration("cache-" + i));
+
+ return ccfgs;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+ ccfg.setName(cacheName);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+ return ccfg;
+ }
+ /**
+ * @param node Node.
+ * @param cacheName Cache name.
+ */
+ private void checkCache(Ignite node, final String cacheName) {
+ assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index fed388a..bc435e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2165,7 +2165,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
for (Ignite node : nodes) {
- log.info("Check node: " + node.name());
+ log.info("Check affinity [node=" + node.name() + ", topVer=" + topVer + ", expIdeal=" + expIdeal + ']');
IgniteKernal node0 = (IgniteKernal)node;
@@ -2175,7 +2175,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
fut.get();
for (GridCacheContext cctx : node0.context().cache().context().cacheContexts()) {
- if (cctx.startTopologyVersion() != null && cctx.startTopologyVersion().compareTo(topVer) > 0)
+ if (cctx.startTopologyVersion().compareTo(topVer) > 0)
continue;
List<List<ClusterNode>> aff1 = aff.get(cctx.name());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 321faf8..88df607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -210,6 +211,8 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
assertEquals(NODES, nodes.size());
+
+ checkCache(node, "cache-" + c);
}
for (int c = 0; c < 5; c++) {
@@ -247,4 +250,11 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
return ccfg;
}
+ /**
+ * @param node Node.
+ * @param cacheName Cache name.
+ */
+ private void checkCache(Ignite node, final String cacheName) {
+ assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 4864a67..bf5ba61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -26,6 +26,7 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -357,9 +359,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
*
*/
private static class FirstStoreFactory implements Factory<CacheStore> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** {@inheritDoc} */
@Override public CacheStore create() {
- String igniteInstanceName = startingIgniteInstanceName.get();
+ String igniteInstanceName = ignite.name();
CacheStore store = firstStores.get(igniteInstanceName);
@@ -374,9 +379,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
*
*/
private static class SecondStoreFactory implements Factory<CacheStore> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** {@inheritDoc} */
@Override public CacheStore create() {
- String igniteInstanceName = startingIgniteInstanceName.get();
+ String igniteInstanceName = ignite.name();
CacheStore store = secondStores.get(igniteInstanceName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
index 372da32..a7128e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
@@ -22,7 +22,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -102,11 +104,7 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
cc.setAtomicityMode(TRANSACTIONAL);
cc.setBackups(1);
- GridCacheTestStore store = new GridCacheTestStore();
-
- stores.add(store);
-
- cc.setCacheStoreFactory(singletonFactory(store));
+ cc.setCacheStoreFactory(new StoreFactory());
cc.setReadThrough(true);
cc.setWriteThrough(true);
cc.setLoadPreviousValue(true);
@@ -269,4 +267,18 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
assertEquals(expPutAll, putAll);
assertEquals(expTxs, txs);
}
+
+ /**
+ *
+ */
+ static class StoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ GridCacheTestStore store = new GridCacheTestStore();
+
+ stores.add(store);
+
+ return store;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0f4aa87..2096179 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -18,8 +18,10 @@
package org.apache.ignite.loadtests.hashmap;
import java.util.IdentityHashMap;
+import java.util.UUID;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheType;
@@ -78,6 +80,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
),
defaultCacheConfiguration(),
CacheType.USER,
+ AffinityTopologyVersion.ZERO,
+ UUID.randomUUID(),
true,
true,
null,