You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/04 04:33:55 UTC
[49/50] incubator-ignite git commit: IGNITE-45 - WIP
IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30d96ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30d96ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30d96ad5
Branch: refs/heads/ignite-45
Commit: 30d96ad594b2d9372a400847068e803bd3c9631a
Parents: 55a9c50
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Mar 3 19:21:08 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Mar 3 19:21:08 2015 -0800
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 66 +++++++++++++++++---
.../affinity/AffinityTopologyVersion.java | 23 ++++++-
.../affinity/GridAffinityAssignmentCache.java | 6 ++
.../cache/DynamicCacheDescriptor.java | 28 ++++++---
.../GridCachePartitionExchangeManager.java | 54 ++++++++++------
.../processors/cache/GridCacheProcessor.java | 36 ++++++++---
.../cache/GridCacheSharedContext.java | 7 +++
.../dht/GridDhtPartitionTopologyImpl.java | 2 +-
.../preloader/GridDhtPartitionExchangeId.java | 5 +-
.../GridDhtPartitionsExchangeFuture.java | 34 +++++++++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 1 +
.../cache/IgniteDynamicCacheStartSelfTest.java | 13 ++++
12 files changed, 219 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index dce04e2..d891149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -166,6 +166,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Custom event listener. */
private GridPlainInClosure<Serializable> customEvtLsnr;
+ /** Map of dynamic cache filters. */
+ private Map<String, IgnitePredicate<ClusterNode>> dynamicCacheFilters = new HashMap<>();
+
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
@@ -214,6 +217,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
getSpi().setNodeAttributes(attrs, ver);
}
+ /**
+ * Adds dynamic cache filters.
+ *
+ * @param cacheName Cache name.
+ * @param filter Cache filter.
+ */
+ public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) {
+ IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter);
+
+ assert old == null;
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
discoOrdered = discoOrdered();
@@ -277,10 +292,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
c.updateAlives(node);
}
+ if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ try {
+ customEvtLsnr.apply(data);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to notify direct custom event listener: " + data, e);
+ }
+ }
+
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
- if (type != EVT_NODE_METRICS_UPDATED && type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ if (type != EVT_NODE_METRICS_UPDATED) {
DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
discoCacheHist.put(topVer, cache);
@@ -307,15 +331,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return;
}
- if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
- try {
- customEvtLsnr.apply(data);
- }
- catch (Exception e) {
- U.error(log, "Failed to notify direct custom event listener: " + data, e);
- }
- }
-
if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) {
boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer);
@@ -1834,6 +1849,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
GridCacheAttributes[] caches = node.attribute(ATTR_CACHE);
+ boolean hasCaches = false;
+
if (caches != null) {
nodesWithCaches.add(node);
@@ -1860,6 +1877,35 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
+ hasCaches = true;
+ }
+
+ for (Map.Entry<String, IgnitePredicate<ClusterNode>> entry : dynamicCacheFilters.entrySet()) {
+ String cacheName = entry.getKey();
+ IgnitePredicate<ClusterNode> filter = entry.getValue();
+
+ if (filter.apply(node)) {
+ addToMap(cacheMap, cacheName, node);
+
+ if (alive(node.id()))
+ addToMap(aliveCacheNodes, maskNull(cacheName), node);
+
+ addToMap(dhtNodesMap, cacheName, node);
+
+ // TODO IGNITE-45 client and near caches.
+
+ if (!loc.id().equals(node.id())) {
+ addToMap(rmtCacheMap, cacheName, node);
+
+ if (alive(node.id()))
+ addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
+ }
+
+ hasCaches = true;
+ }
+ }
+
+ if (hasCaches) {
if (alive(node.id())) {
aliveNodesWithCaches.add(node);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index e276253..cb24ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -89,17 +89,34 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
/** {@inheritDoc} */
@Override public int compareTo(AffinityTopologyVersion o) {
- return Long.compare(topVer, o.topVer);
+ int cmp = Long.compare(topVer, o.topVer);
+
+ if (cmp == 0)
+ return Integer.compare(minorTopVer, o.minorTopVer);
+
+ return cmp;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
- return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer;
+ if (this == o)
+ return true;
+
+ if (!(o instanceof AffinityTopologyVersion))
+ return false;
+
+ AffinityTopologyVersion that = (AffinityTopologyVersion)o;
+
+ return minorTopVer == that.minorTopVer && topVer == that.topVer;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return (int)topVer;
+ int result = (int)(topVer ^ (topVer >>> 32));
+
+ result = 31 * result + minorTopVer;
+
+ return result;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index b964f83..4bb5885 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -150,6 +150,8 @@ public class GridAffinityAssignmentCache {
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
+ U.debug(log, "Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
+ ", discoEvt=" + discoEvt + ']');
GridAffinityAssignment prev = affCache.get(topVer.previous());
@@ -162,6 +164,8 @@ public class GridAffinityAssignmentCache {
// Resolve nodes snapshot for specified topology version.
Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer.topologyVersion());
+ U.debug(log, "Affinity nodes: " + nodes);
+
sorted = sort(nodes);
}
@@ -187,6 +191,8 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
+ U.debug(log, "Updated assignment: " + updated);
+
updated = F.addIfAbsent(affCache, topVer, updated);
// Update top version, if required.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 196730c..6a6e227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -28,28 +29,35 @@ import java.io.*;
* Cache start descriptor.
*/
public class DynamicCacheDescriptor implements Serializable {
+ /** Cache start ID. */
+ private IgniteUuid startId;
+
/** Cache configuration. */
@GridToStringExclude
private CacheConfiguration cacheCfg;
/** Deploy filter bytes. */
@GridToStringExclude
- private byte[] deployFltrBytes;
-
- /** Cache start ID. */
- private IgniteUuid startId;
+ private IgnitePredicate<ClusterNode> nodeFilter;
/**
* @param cacheCfg Cache configuration.
- * @param deployFltrBytes Deployment filter bytes.
+ * @param nodeFilter Node filter.
*/
- public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) {
+ public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) {
this.cacheCfg = cacheCfg;
- this.deployFltrBytes = deployFltrBytes;
+ this.nodeFilter = nodeFilter;
this.startId = startId;
}
/**
+ * @return Start ID.
+ */
+ public IgniteUuid startId() {
+ return startId;
+ }
+
+ /**
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration() {
@@ -57,10 +65,10 @@ public class DynamicCacheDescriptor implements Serializable {
}
/**
- * @return Start ID.
+ * @return Node filter.
*/
- public IgniteUuid startId() {
- return startId;
+ public IgnitePredicate<ClusterNode> nodeFilter() {
+ return nodeFilter;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 76ecea4..d38161e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.events.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -114,6 +115,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
final ClusterNode n = e.eventNode();
+ GridDhtPartitionExchangeId exchId = null;
+ GridDhtPartitionsExchangeFuture<K, V> exchFut = null;
+
if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
assert !loc.id().equals(n.id());
@@ -129,12 +133,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
"Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
- GridDhtPartitionExchangeId exchId = exchangeId(n.id(),
+ exchId = exchangeId(n.id(),
new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0),
e.type());
- GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e);
+ exchFut = exchangeFuture(exchId, e, null);
+ }
+ else {
+ DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+
+ if (customEvt.data() instanceof DynamicCacheDescriptor) {
+ DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data();
+
+ // Check if this event should trigger partition exchange.
+ if (cctx.cache().dynamicCacheRegistered(desc)) {
+ exchId = exchangeId(n.id(),
+ new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer),
+ e.type());
+
+ exchFut = exchangeFuture(exchId, e, desc);
+ }
+ }
+ }
+ if (exchId != null) {
// Start exchange process.
pendingExchangeFuts.add(exchFut);
@@ -161,9 +183,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
});
}
- else {
- // TODO.
- }
}
finally {
leaveBusy();
@@ -225,7 +244,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
- GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt);
+ GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt, null);
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
@@ -399,16 +418,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * Callback to start exchange for dynamically started cache.
- *
- * @param cacheDesc Cache descriptor.
- */
- public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) {
- // TODO IGNITE-45 move to exchange future.
- cctx.kernalContext().cache().onCacheStartFinished(cacheDesc);
- }
-
- /**
* @return {@code True} if topology has changed.
*/
public boolean topologyChanged() {
@@ -579,11 +588,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId,
- @Nullable DiscoveryEvent discoEvt) {
+ @Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) {
GridDhtPartitionsExchangeFuture<K, V> fut;
GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx(
- fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId));
+ fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc));
if (old != null)
fut = old;
@@ -606,6 +615,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.cleanUp();
}
}
+
+ DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor();
+
+ if (desc != null)
+ cctx.cache().onCacheStartFinished(desc);
}
/**
@@ -654,7 +668,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
refreshPartitions();
}
else
- exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
+ exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
}
finally {
leaveBusy();
@@ -692,7 +706,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
scheduleResendPartitions();
}
else
- exchangeFuture(msg.exchangeId(), null).onReceive(node.id(), msg);
+ exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
}
finally {
leaveBusy();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 069930e..a22c9a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1170,6 +1170,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param desc Descriptor to check.
+ * @return {@code True} if cache was registered for start and exchange future should be created.
+ */
+ public boolean dynamicCacheRegistered(DynamicCacheDescriptor desc) {
+ return dynamicCaches.get(desc.cacheConfiguration().getName()) == desc;
+ }
+
+ /**
+ * @param startDesc Start descriptor.
+ */
+ public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws IgniteCheckedException {
+ CacheConfiguration cfg = new CacheConfiguration(startDesc.cacheConfiguration());
+
+ initialize(cfg);
+
+ GridCacheContext cacheCtx = createCache(cfg);
+
+ sharedCtx.addCacheContext(cacheCtx);
+
+ startCache(cacheCtx.cache());
+ onKernalStart(cacheCtx.cache());
+ }
+
+ /**
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param startDesc Cache start descriptor.
@@ -1256,8 +1280,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId()));
try {
- byte[] filterBytes = ctx.config().getMarshaller().marshal(nodeFilter);
-
for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) {
if (ccfg0.getName().equals(ccfg.getName()))
return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
@@ -1274,11 +1296,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already started): " + ccfg.getName()));
- ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, filterBytes, fut.startId()));
+ ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, nodeFilter, fut.startId()));
return fut;
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
fut.onDone(e);
// Safety.
@@ -1315,11 +1337,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc);
- assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
-
- // TODO IGNITE-45 create cache context here.
+ ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter());
- sharedCtx.exchange().onCacheDeployed(startDesc);
+ assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index e133a17..aadb153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -115,6 +115,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Cache processor.
+ */
+ public GridCacheProcessor cache() {
+ return kernalCtx.cache();
+ }
+
+ /**
* Adds cache context to shared cache context.
*
* @param cacheCtx Cache context to add.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4af7534..e86996d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -209,7 +209,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology<K,
assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
topVer + ", exchId=" + exchId + ']';
- if (!exchId.isJoined())
+ if (exchId.isLeft())
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index d101efd..1145bdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import org.apache.ignite.internal.events.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -28,6 +29,7 @@ import java.nio.*;
import java.util.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
/**
* Exchange ID.
@@ -54,7 +56,8 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
*/
public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) {
assert nodeId != null;
- assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED;
+ assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED ||
+ evt == EVT_DISCOVERY_CUSTOM_EVT;
assert topVer.topologyVersion() > 0;
this.nodeId = nodeId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 81ab4bf..de87904 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -142,6 +142,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
/** Logger. */
private IgniteLogger log;
+ /** Dynamic cache start descriptor. */
+ private DynamicCacheDescriptor startDesc;
+
/**
* Dummy future created to trigger reassignments if partition
* topology changed while preloading.
@@ -197,8 +200,12 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
* @param busyLock Busy lock.
* @param exchId Exchange ID.
*/
- public GridDhtPartitionsExchangeFuture(GridCacheSharedContext<K, V> cctx, ReadWriteLock busyLock,
- GridDhtPartitionExchangeId exchId) {
+ public GridDhtPartitionsExchangeFuture(
+ GridCacheSharedContext<K, V> cctx,
+ ReadWriteLock busyLock,
+ GridDhtPartitionExchangeId exchId,
+ DynamicCacheDescriptor startDesc
+ ) {
super(cctx.kernalContext());
syncNotify(true);
@@ -213,6 +220,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
+ this.startDesc = startDesc;
log = cctx.logger(getClass());
@@ -379,6 +387,13 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
}
/**
+ * @return Dynamic cache descriptor.
+ */
+ public DynamicCacheDescriptor dynamicCacheDescriptor() {
+ return startDesc;
+ }
+
+ /**
* @return Init future.
*/
IgniteInternalFuture<?> initFuture() {
@@ -422,6 +437,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
// will return corresponding nodes.
U.await(evtLatch);
+ if (startDesc != null)
+ startCache();
+
assert discoEvt != null;
assert exchId.nodeId().equals(discoEvt.eventNode().id());
@@ -433,7 +451,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
}
// Grab all alive remote nodes with order of equal or less than last joined node.
- rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, exchId.topologyVersion().topologyVersion()));
+ rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+ exchId.topologyVersion().topologyVersion()));
rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -547,6 +566,15 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
}
/**
+ * Starts dynamic cache.
+ */
+ private void startCache() throws IgniteCheckedException {
+ assert startDesc != null;
+
+ ctx.cache().onCacheStartExchange(startDesc);
+ }
+
+ /**
* @param node Node.
* @param id ID.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 34995ba..81b128e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4481,6 +4481,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
}
msg.verify(getLocalNodeId());
+ msg.topologyVersion(ring.topologyVersion());
}
if (msg.verified()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d96ad5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 5d515e3..efb5db2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -136,4 +136,17 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
assertEquals(1, succeeded);
assertEquals(threadNum - 1, failed);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheSimple() throws Exception {
+ final IgniteKernal kernal = (IgniteKernal)grid(0);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName("TestCacheName3");
+
+ kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get();
+ }
}