You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/04/18 15:47:39 UTC
[ignite] branch master updated: IGNITE-11188 Optimize baseline
autoadjustment for in-memory clusters with zero timeout
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 183c6a4 IGNITE-11188 Optimize baseline autoadjustment for in-memory clusters with zero timeout
183c6a4 is described below
commit 183c6a4b761af33013d3361621bc3fc21a2eb7e1
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Apr 18 18:47:19 2019 +0300
IGNITE-11188 Optimize baseline autoadjustment for in-memory clusters with zero timeout
---
.../org/apache/ignite/internal/IgniteKernal.java | 3 +
.../cluster/DistributedBaselineConfiguration.java | 19 +--
.../ignite/internal/cluster/IgniteClusterImpl.java | 26 ++---
.../managers/discovery/GridDiscoveryManager.java | 102 ++++++++++++++--
.../affinity/GridAffinityAssignmentCache.java | 106 +++++++++--------
.../cache/CacheAffinitySharedManager.java | 8 +-
.../processors/cache/CacheGroupContext.java | 4 +-
.../cache/GridCachePartitionExchangeManager.java | 7 +-
.../processors/cache/GridCacheSharedContext.java | 3 +
.../internal/processors/cache/GridCacheUtils.java | 31 +++++
.../preloader/GridDhtPartitionsExchangeFuture.java | 26 ++---
.../preloader/GridDhtPartitionsSingleMessage.java | 3 +-
.../cluster/DiscoveryDataClusterState.java | 21 ++++
.../cluster/GridClusterStateProcessor.java | 128 +++++++++++++++++++--
.../baseline/autoadjust/ChangeTopologyWatcher.java | 14 +--
.../DistributedMetaStorageCasMessage.java | 2 +-
.../persistence/DistributedMetaStorageImpl.java | 11 +-
.../DistributedMetaStorageUpdateMessage.java | 3 +
.../apache/ignite/internal/util/IgniteUtils.java | 9 +-
.../spi/discovery/IgniteDiscoveryThread.java | 4 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 35 +-----
.../cache/IgniteClusterActivateDeactivateTest.java | 11 +-
...niteDynamicCacheStartNoExchangeTimeoutTest.java | 54 ++++-----
...lientNodeBinaryObjectMetadataMultinodeTest.java | 8 +-
.../distributed/CacheBaselineTopologyTest.java | 53 ++-------
.../CacheLateAffinityAssignmentTest.java | 31 +++--
.../GridCacheAbstractNodeRestartSelfTest.java | 1 -
.../cluster/BaselineAutoAdjustInMemoryTest.java | 64 +++++++++++
.../processors/cluster/BaselineAutoAdjustTest.java | 79 ++++++++-----
.../DistributedMetaStoragePersistentTest.java | 67 -----------
.../metastorage/DistributedMetaStorageTest.java | 70 +++++++++++
.../ServiceDeploymentOutsideBaselineTest.java | 5 +-
.../IgniteDiscoveryCacheReuseSelfTest.java | 3 +
.../testsuites/IgniteKernalSelfTestSuite.java | 3 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 21 ----
35 files changed, 654 insertions(+), 381 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c3c725d..42a33d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -147,6 +147,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
@@ -1048,6 +1049,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.cache().context().database().notifyMetaStorageSubscribersOnReadyForRead();
+ ((DistributedMetaStorageImpl)ctx.distributedMetastorage()).inMemoryReadyForRead();
+
ctx.cache().context().database().startMemoryRestore(ctx);
ctx.recoveryMode(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
index 37a6e7b..5b891d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
@@ -54,21 +54,21 @@ public class DistributedBaselineConfiguration {
private static final String PROPERTY_UPDATE_MESSAGE =
"Baseline parameter '%s' was changed from '%s' to '%s'";
/** */
- private volatile long dfltTimeout = DEFAULT_PERSISTENCE_TIMEOUT;
+ private volatile long dfltTimeout;
/** Default auto-adjust enable/disable. */
- private volatile boolean dfltEnabled = false;
+ private volatile boolean dfltEnabled;
/** */
private final GridKernalContext ctx;
/** */
private final IgniteLogger log;
/** Value of manual baseline control or auto adjusting baseline. */
- private volatile DistributedBooleanProperty baselineAutoAdjustEnabled =
+ private final DistributedBooleanProperty baselineAutoAdjustEnabled =
detachedBooleanProperty("baselineAutoAdjustEnabled");
/**
* Value of time which we would wait before the actual topology change since last discovery event(node join/exit).
*/
- private volatile DistributedLongProperty baselineAutoAdjustTimeout =
+ private final DistributedLongProperty baselineAutoAdjustTimeout =
detachedLongProperty("baselineAutoAdjustTimeout");
/**
@@ -81,13 +81,14 @@ public class DistributedBaselineConfiguration {
IgniteLogger log) {
this.ctx = ctx;
this.log = log;
- isp.registerDistributedConfigurationListener(
- dispatcher -> {
- boolean persistenceEnabled = ctx.config() != null && CU.isPersistenceEnabled(ctx.config());
- dfltTimeout = persistenceEnabled ? DEFAULT_PERSISTENCE_TIMEOUT : DEFAULT_IN_MEMORY_TIMEOUT;
- dfltEnabled = !persistenceEnabled;
+ boolean persistenceEnabled = ctx.config() != null && CU.isPersistenceEnabled(ctx.config());
+
+ dfltTimeout = persistenceEnabled ? DEFAULT_PERSISTENCE_TIMEOUT : DEFAULT_IN_MEMORY_TIMEOUT;
+ dfltEnabled = getBoolean(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, !persistenceEnabled);
+ isp.registerDistributedConfigurationListener(
+ dispatcher -> {
baselineAutoAdjustEnabled.addListener(makeUpdateListener(dfltEnabled));
baselineAutoAdjustTimeout.addListener(makeUpdateListener(dfltTimeout));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index eabf701..a8a8d5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -323,14 +323,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
/** */
private Collection<BaselineNode> baselineNodes() {
- Collection<ClusterNode> srvNodes = ctx.cluster().get().forServers().nodes();
-
- ArrayList baselineNodes = new ArrayList(srvNodes.size());
-
- for (ClusterNode clN : srvNodes)
- baselineNodes.add(clN);
-
- return baselineNodes;
+ return new ArrayList<>(ctx.cluster().get().forServers().nodes());
}
/** {@inheritDoc} */
@@ -352,9 +345,6 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
guard();
try {
- if (isInMemoryMode())
- return;
-
validateBeforeBaselineChange(baselineTop);
ctx.state().changeGlobalState(true, baselineTop, true).get();
@@ -378,11 +368,6 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
setBaselineTopology(topVer, true);
}
- /** */
- private boolean isInMemoryMode() {
- return !CU.isPersistenceEnabled(cfg);
- }
-
/**
* Verifies all nodes in current cluster topology support BaselineTopology feature so compatibilityMode flag is
* enabled to reset.
@@ -473,13 +458,16 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
setBaselineTopology(topVer, false);
}
+ /**
+ * Set baseline topology.
+ *
+ * @param topVer Topology version.
+ * @param isBaselineAutoAdjust Whether this is an automatic update or not.
+ */
private void setBaselineTopology(long topVer, boolean isBaselineAutoAdjust) {
guard();
try {
- if (isInMemoryMode())
- return;
-
Collection<ClusterNode> top = topology(topVer);
if (top == null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 1b3e89f..5fcf934 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -570,6 +570,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (ctx.config().getCommunicationFailureResolver() != null)
ctx.resource().injectGeneric(ctx.config().getCommunicationFailureResolver());
+ // Shared reference between DiscoverySpiListener and DiscoverySpiDataExchange.
+ AtomicReference<IgniteFuture<?>> lastStateChangeEvtLsnrFutRef = new AtomicReference<>();
+
spi.setListener(new DiscoverySpiListener() {
private long gridStartTime;
@@ -602,7 +605,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
});
- return new IgniteFutureImpl<>(notificationFut);
+ IgniteFuture<?> fut = new IgniteFutureImpl<>(notificationFut);
+
+ //TODO could be optimized with more specific conditions.
+ switch (type) {
+ case EVT_NODE_JOINED:
+ case EVT_NODE_LEFT:
+ case EVT_NODE_FAILED:
+ if (!CU.isPersistenceEnabled(ctx.config()))
+ lastStateChangeEvtLsnrFutRef.set(fut);
+
+ break;
+
+ case EVT_DISCOVERY_CUSTOM_EVT:
+ lastStateChangeEvtLsnrFutRef.set(fut);
+ }
+
+ return fut;
}
/**
@@ -679,12 +698,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
- Snapshot snapshot = topSnap.get();
-
- // Topology version does not change, but need create DiscoCache with new state.
- DiscoCache discoCache = snapshot.discoCache.copy(snapshot.topVer, ctx.state().clusterState());
-
- topSnap.set(new Snapshot(snapshot.topVer, discoCache));
+ updateTopologySnapshot();
incMinorTopVer = false;
}
@@ -766,6 +780,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// Current version.
discoCache = discoCache();
+ if (locJoinEvt || !node.isClient() && !node.isDaemon()) {
+ if (type == EVT_NODE_LEFT || type == EVT_NODE_FAILED || type == EVT_NODE_JOINED) {
+ boolean discoCacheUpdated = ctx.state().autoAdjustInMemoryClusterState(
+ node.id(),
+ topSnapshot,
+ discoCache,
+ topVer,
+ minorTopVer
+ );
+
+ if (discoCacheUpdated) {
+ discoCache = discoCache();
+
+ discoCacheHist.put(nextTopVer, discoCache);
+ }
+ }
+ }
+
// If this is a local join event, just save it and do not notify listeners.
if (locJoinEvt) {
if (gridStartTime == 0)
@@ -854,12 +886,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ctx.service().onLocalJoin(localJoinEvent(), discoCache);
+ DiscoCache discoCache0 = discoCache;
+
ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
@Override public void apply(IgniteFuture<?> fut) {
try {
fut.get();
- discoWrk.addEvent(EVT_CLIENT_NODE_RECONNECTED, nextTopVer, node, discoCache, topSnapshot, null);
+ discoWrk.addEvent(EVT_CLIENT_NODE_RECONNECTED, nextTopVer, node, discoCache0, topSnapshot, null);
}
catch (IgniteException ignore) {
// No-op.
@@ -888,6 +922,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
c.collectJoiningNodeData(dataBag);
}
else {
+ waitForLastStateChangeEventFuture();
+
for (GridComponent c : ctx.components())
c.collectGridNodeData(dataBag);
}
@@ -933,6 +969,34 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
}
+
+ /** */
+ private void waitForLastStateChangeEventFuture() {
+ IgniteFuture<?> lastStateChangeEvtLsnrFut = lastStateChangeEvtLsnrFutRef.get();
+
+ if (lastStateChangeEvtLsnrFut != null) {
+ Thread currThread = Thread.currentThread();
+
+ GridWorker worker = currThread instanceof IgniteDiscoveryThread
+ ? ((IgniteDiscoveryThread)currThread).worker()
+ : null;
+
+ if (worker != null)
+ worker.blockingSectionBegin();
+
+ try {
+ lastStateChangeEvtLsnrFut.get();
+ }
+ finally {
+ // Guaranteed to be invoked in the same thread as DiscoverySpiListener#onDiscovery.
+ // No additional synchronization for reference is required.
+ lastStateChangeEvtLsnrFutRef.set(null);
+
+ if (worker != null)
+ worker.blockingSectionEnd();
+ }
+ }
+ }
});
new DiscoveryMessageNotifierThread(discoNtfWrk).start();
@@ -971,6 +1035,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Update {@link #topSnap} with the latest cluster state.
+ */
+ public void updateTopologySnapshot() {
+ Snapshot snapshot = topSnap.get();
+
+ // Topology version does not change, but need create DiscoCache with new state.
+ DiscoCache discoCache = snapshot.discoCache.copy(snapshot.topVer, ctx.state().clusterState());
+
+ topSnap.set(new Snapshot(snapshot.topVer, discoCache));
+ }
+
+ /**
* @param type Message type.
* @param customMsg Custom message.
* @return {@code True} if should not process message.
@@ -2642,9 +2718,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** */
private class DiscoveryMessageNotifierThread extends IgniteThread implements IgniteDiscoveryThread {
+ /** */
+ private final GridWorker worker;
+
/** {@inheritDoc} */
public DiscoveryMessageNotifierThread(GridWorker worker) {
super(worker);
+
+ this.worker = worker;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridWorker worker() {
+ return worker;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 95e26b8..7c2b1dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -119,9 +119,6 @@ public class GridAffinityAssignmentCache {
/** */
private final boolean locCache;
- /** */
- private final boolean persistentCache;
-
/** Node stop flag. */
private volatile IgniteCheckedException stopErr;
@@ -148,9 +145,8 @@ public class GridAffinityAssignmentCache {
AffinityFunction aff,
IgnitePredicate<ClusterNode> nodeFilter,
int backups,
- boolean locCache,
- boolean persistentCache)
- {
+ boolean locCache
+ ) {
assert ctx != null;
assert aff != null;
assert nodeFilter != null;
@@ -163,7 +159,6 @@ public class GridAffinityAssignmentCache {
this.grpId = grpId;
this.backups = backups;
this.locCache = locCache;
- this.persistentCache = persistentCache;
log = ctx.log(GridAffinityAssignmentCache.class);
@@ -327,11 +322,14 @@ public class GridAffinityAssignmentCache {
boolean hasBaseline = false;
boolean changedBaseline = false;
+ BaselineTopology blt = null;
+
if (discoCache != null) {
- hasBaseline = discoCache.state().baselineTopology() != null && persistentCache;
+ blt = discoCache.state().baselineTopology();
+
+ hasBaseline = blt != null;
- changedBaseline = !hasBaseline ? baselineTopology != null :
- !discoCache.state().baselineTopology().equals(baselineTopology);
+ changedBaseline = !hasBaseline ? baselineTopology != null : !blt.equals(baselineTopology);
}
IdealAffinityAssignment assignment;
@@ -351,19 +349,8 @@ public class GridAffinityAssignmentCache {
}
}
- if (skipCalculation)
- assignment = prevAssignment;
- else if (hasBaseline && !changedBaseline) {
- if (baselineAssignment == null) {
- List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology()
- .createBaselineView(sorted, nodeFilter);
-
- List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
- baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null,
- events.lastEvent(), topVer, backups));
-
- baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
- }
+ if (hasBaseline && changedBaseline) {
+ recalculateBaselineAssignment(topVer, events, prevAssignment, sorted, blt);
assignment = IdealAffinityAssignment.createWithPreservedPrimaries(
topVer,
@@ -371,15 +358,11 @@ public class GridAffinityAssignmentCache {
baselineAssignment
);
}
- else if (hasBaseline && changedBaseline) {
- List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology()
- .createBaselineView(sorted, nodeFilter);
-
- List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
- baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null,
- events.lastEvent(), topVer, backups));
-
- baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
+ else if (skipCalculation)
+ assignment = prevAssignment;
+ else if (hasBaseline && !changedBaseline) {
+ if (baselineAssignment == null)
+ recalculateBaselineAssignment(topVer, events, prevAssignment, sorted, blt);
assignment = IdealAffinityAssignment.createWithPreservedPrimaries(
topVer,
@@ -388,28 +371,20 @@ public class GridAffinityAssignmentCache {
);
}
else {
- List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted,
- prevAssignment != null ? prevAssignment.assignment() : null,
- events.lastEvent(), topVer, backups));
+ List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
+ sorted,
+ prevAssignment.assignment(),
+ events.lastEvent(),
+ topVer,
+ backups
+ ));
assignment = IdealAffinityAssignment.create(topVer, sorted, calculated);
}
}
else {
- DiscoveryEvent event = null;
-
- if (events != null)
- event = events.lastEvent();
-
if (hasBaseline) {
- List<ClusterNode> baselineAffinityNodes = discoCache.state().baselineTopology()
- .createBaselineView(sorted, nodeFilter);
-
- List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
- baselineAffinityNodes, prevAssignment != null ? prevAssignment.assignment() : null,
- event, topVer, backups));
-
- baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
+ recalculateBaselineAssignment(topVer, events, prevAssignment, sorted, blt);
assignment = IdealAffinityAssignment.createWithPreservedPrimaries(
topVer,
@@ -420,7 +395,10 @@ public class GridAffinityAssignmentCache {
else {
List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted,
prevAssignment != null ? prevAssignment.assignment() : null,
- event, topVer, backups));
+ events != null ? events.lastEvent() : null,
+ topVer,
+ backups
+ ));
assignment = IdealAffinityAssignment.create(topVer, sorted, calculated);
}
@@ -434,7 +412,8 @@ public class GridAffinityAssignmentCache {
printDistributionIfThresholdExceeded(assignment.assignment(), sorted.size());
if (hasBaseline) {
- baselineTopology = discoCache.state().baselineTopology();
+ baselineTopology = blt;
+
assert baselineAssignment != null;
}
else {
@@ -450,6 +429,33 @@ public class GridAffinityAssignmentCache {
/**
* @param topVer Topology version.
+ * @param events Evetns.
+ * @param prevAssignment Previous assignment.
+ * @param sorted Sorted cache group nodes.
+ * @param blt Baseline topology.
+ */
+ private void recalculateBaselineAssignment(
+ AffinityTopologyVersion topVer,
+ ExchangeDiscoveryEvents events,
+ IdealAffinityAssignment prevAssignment,
+ List<ClusterNode> sorted,
+ BaselineTopology blt
+ ) {
+ List<ClusterNode> baselineAffinityNodes = blt.createBaselineView(sorted, nodeFilter);
+
+ List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
+ baselineAffinityNodes,
+ prevAssignment != null ? prevAssignment.assignment() : null,
+ events != null ? events.lastEvent() : null,
+ topVer,
+ backups
+ ));
+
+ baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
+ }
+
+ /**
+ * @param topVer Topology version.
* @return Baseline assignment with filtered out offline nodes.
*/
private List<List<ClusterNode>> baselineAssignmentWithoutOfflineNodes(AffinityTopologyVersion topVer) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 203ffb7..9a80d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -3031,8 +3031,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
affFunc,
ccfg.getNodeFilter(),
ccfg.getBackups(),
- ccfg.getCacheMode() == LOCAL,
- grpDesc.persistenceEnabled());
+ ccfg.getCacheMode() == LOCAL
+ );
return new CacheGroupNoAffOrFiltredHolder(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
}
@@ -3078,8 +3078,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
affFunc,
ccfg.getNodeFilter(),
ccfg.getBackups(),
- ccfg.getCacheMode() == LOCAL,
- grpDesc.persistenceEnabled());
+ ccfg.getCacheMode() == LOCAL
+ );
return new CacheGroupNoAffOrFiltredHolder(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 0960b4b..29efa76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -1072,8 +1072,8 @@ public class CacheGroupContext {
ccfg.getAffinity(),
ccfg.getNodeFilter(),
ccfg.getBackups(),
- ccfg.getCacheMode() == LOCAL,
- persistenceEnabled());
+ ccfg.getCacheMode() == LOCAL
+ );
if (ccfg.getCacheMode() != LOCAL)
top = new GridDhtPartitionTopologyImpl(ctx, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cd425ab..774c74a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -518,7 +518,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
ExchangeActions exchActs = null;
- if (evt.type() == EVT_NODE_JOINED && evt.eventNode().isLocal()) {
+ boolean locJoin = evt.type() == EVT_NODE_JOINED && evt.eventNode().isLocal();
+
+ if (locJoin) {
LocalJoinCachesContext locJoinCtx = cctx.cache().localJoinCachesContext();
if (locJoinCtx != null) {
@@ -528,6 +530,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ if (!n.isClient() && !n.isDaemon())
+ exchActs = cctx.kernalContext().state().autoAdjustExchangeActions(exchActs);
+
exchFut = exchangeFuture(exchId, evt, cache, exchActs, null);
}
else {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 32a8f5b..783939c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionMetri
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -294,6 +295,8 @@ public class GridCacheSharedContext<K, V> {
if (msgLog.isInfoEnabled())
msgLog.info("Components activation performed in " + (System.currentTimeMillis() - time) + " ms.");
+
+ ((DistributedMetaStorageImpl)kernalCtx.distributedMetastorage()).inMemoryReadyForWrite();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6c0b041..4d4bc1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -99,6 +99,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.spi.encryption.EncryptionSpi;
@@ -1858,6 +1859,36 @@ public class GridCacheUtils {
}
/**
+ * @param nodes Nodes to check.
+ * @param cfg Config to class loader.
+ * @return {@code true} if cluster has only in-memory nodes.
+ */
+ public static boolean isInMemoryCluster(Collection<ClusterNode> nodes, IgniteConfiguration cfg) {
+ return nodes.stream().allMatch(serNode -> !CU.isPersistenceEnabled(extractDataStorage(serNode, cfg)));
+ }
+
+ /**
+ * Extract and unmarshal data storage configuration from given node.
+ *
+ * @param node Source of data storage configuration.
+ * @return Data storage configuration for given node.
+ */
+ private static DataStorageConfiguration extractDataStorage(ClusterNode node, IgniteConfiguration cfg) {
+ Object dsCfgBytes = node.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
+
+ if (dsCfgBytes instanceof byte[]) {
+ try {
+ return new JdkMarshaller().unmarshal((byte[])dsCfgBytes, U.resolveClassLoader(cfg));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @return {@code true} if persistence is enabled for at least one data region, {@code false} if not.
*/
public static boolean isPersistenceEnabled(IgniteConfiguration cfg) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8837de2..d1b1f22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -117,7 +117,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
@@ -3513,13 +3512,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this);
- if (!fut.isDone()) {
- fut.listen(new IgniteInClosure<IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>>>() {
- @Override public void apply(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
- onAffinityInitialized(fut);
- }
- });
- }
+ if (!fut.isDone())
+ fut.listen(this::onAffinityInitialized);
else
onAffinityInitialized(fut);
}
@@ -3609,15 +3603,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
}
- boolean active = !stateChangeErr && req.activate();
+ if (!cctx.kernalContext().state().clusterState().localBaselineAutoAdjustment()) {
+ boolean active = !stateChangeErr && req.activate();
- ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
- req.requestId(),
- active,
- !stateChangeErr
- );
+ ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
+ req.requestId(),
+ active,
+ !stateChangeErr
+ );
- cctx.discovery().sendCustomEvent(stateFinishMsg);
+ cctx.discovery().sendCustomEvent(stateFinishMsg);
+ }
timeBag.finishGlobalStage("State finish message sending");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 3921106..bbd2480 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -127,7 +127,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
boolean client,
@Nullable GridCacheVersion lastVer,
- boolean compress) {
+ boolean compress
+ ) {
super(exchId, lastVer);
this.client = client;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index b3879ee..6f607d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -79,6 +79,9 @@ public class DiscoveryDataClusterState implements Serializable {
/** Transition result error. */
private transient volatile Exception transitionError;
+ /** Local baseline autoadjustment flag. */
+ private transient volatile boolean locBaselineAutoAdjustment;
+
/**
* @param active Current status.
* @return State instance.
@@ -246,6 +249,24 @@ public class DiscoveryDataClusterState implements Serializable {
}
/**
+ * @return {@code true} if current state was created as a result of local baseline autoadjustment with zero timeout
+ * on in-memory cluster.
+ */
+ public boolean localBaselineAutoAdjustment() {
+ return locBaselineAutoAdjustment;
+ }
+
+ /**
+ * Set local baseline autoadjustment flag.
+ *
+ * @param adjusted Flag value.
+ * @see #localBaselineAutoAdjustment()
+ */
+ public void localBaselineAutoAdjustment(boolean adjusted) {
+ locBaselineAutoAdjustment = adjusted;
+ }
+
+ /**
* Creates a non-transitional cluster state. This method effectively cleans all fields identifying the
* state as transitional and creates a new state with the state transition result.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 23901cc..83c36fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration;
+import org.apache.ignite.internal.cluster.IgniteClusterImpl;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -372,8 +373,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
&& !ctx.isDaemon()
&& ctx.config().isAutoActivationEnabled()
&& !state.active()
- && isBaselineSatisfied(state.baselineTopology(), discoCache.serverNodes()))
- changeGlobalState(true, state.baselineTopology().currentBaseline(), false);
+ && !inMemoryMode
+ && isBaselineSatisfied(state.baselineTopology(), discoCache.serverNodes())
+ )
+ changeGlobalState(true, state.baselineTopology().currentBaseline(), false);
return null;
}
@@ -385,12 +388,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
if (blt == null || blt.consistentIds() == null || ctx.clientNode() || ctx.isDaemon())
return;
- if (!CU.isPersistenceEnabled(ctx.config()))
- return;
-
if (!blt.consistentIds().contains(ctx.discovery().localNode().consistentId())) {
U.quietAndInfo(log, "Local node is not included in Baseline Topology and will not be used " +
- "for persistent data storage. Use control.(sh|bat) script or IgniteCluster interface to include " +
+ "for data storage. Use control.(sh|bat) script or IgniteCluster interface to include " +
"the node to Baseline Topology.");
}
}
@@ -792,9 +792,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
boolean forceChangeBaselineTopology,
boolean isAutoAdjust
) {
- if (inMemoryMode)
- return changeGlobalState0(activate, null, false, isAutoAdjust);
-
BaselineTopology newBlt = (compatibilityMode && !forceChangeBaselineTopology) ? null :
calculateNewBaselineTopology(activate, baselineNodes, forceChangeBaselineTopology);
@@ -925,7 +922,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
List<StoredCacheData> storedCfgs = null;
- if (activate && CU.isPersistenceEnabled(ctx.config())) {
+ if (activate && !inMemoryMode) {
try {
Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
@@ -1118,7 +1115,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
- IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate, blt, forceBlt));
+ IgniteFuture<Void> fut = comp.runAsync(
+ new ClientChangeGlobalStateComputeRequest(activate, blt, forceBlt)
+ );
fut.listen(new CI1<IgniteFuture>() {
@Override public void apply(IgniteFuture fut) {
@@ -1366,6 +1365,113 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
}
+ /**
+ * Update baseline locally if cluster is not persistent and baseline autoadjustment is enabled with zero timeout.
+ *
+ * @param nodeId Id of the node that initiated the operation (joined/left/failed).
+ * @param topSnapshot Topology snapshot from the discovery message.
+ * @param discoCache Discovery cache from the discovery manager.
+ * @param topVer Topology version.
+ * @param minorTopVer Minor topology version.
+ * @return {@code true} if baseline was changed.
+ */
+ public boolean autoAdjustInMemoryClusterState(
+ UUID nodeId,
+ Collection<ClusterNode> topSnapshot,
+ DiscoCache discoCache,
+ long topVer,
+ int minorTopVer
+ ) {
+ IgniteClusterImpl cluster = ctx.cluster().get();
+
+ DiscoveryDataClusterState oldState = globalState;
+
+ boolean autoAdjustBaseline = CU.isInMemoryCluster(ctx.discovery().allNodes(), ctx.config())
+ && oldState.active()
+ && !oldState.transition()
+ && cluster.isBaselineAutoAdjustEnabled()
+ && cluster.baselineAutoAdjustTimeout() == 0L;
+
+ if (autoAdjustBaseline) {
+ BaselineTopology oldBlt = oldState.baselineTopology();
+
+ Collection<ClusterNode> bltNodes = topSnapshot.stream()
+ .filter(n -> !n.isClient() && !n.isDaemon())
+ .collect(Collectors.toList());
+
+ if (!bltNodes.isEmpty()) {
+ int newBltId = oldBlt == null ? 0 : oldBlt.id();
+
+ BaselineTopology newBlt = BaselineTopology.build(bltNodes, newBltId);
+
+ ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
+ nodeId,
+ nodeId,
+ null,
+ true,
+ newBlt,
+ true,
+ System.currentTimeMillis()
+ );
+
+ AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, minorTopVer);
+
+ onStateChangeMessage(ver, changeGlobalStateMsg, discoCache);
+
+ ChangeGlobalStateFinishMessage finishMsg = new ChangeGlobalStateFinishMessage(nodeId, true, true);
+
+ onStateFinishMessage(finishMsg);
+
+ globalState.localBaselineAutoAdjustment(true);
+
+ ctx.discovery().updateTopologySnapshot();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Add fake state change request into exchange actions if cluster is not persistent and baseline autoadjustment
+ * is enabled with zero timeout.
+ *
+ * @param exchActs Current exchange actions.
+ * @return New exchange actions.
+ */
+ public ExchangeActions autoAdjustExchangeActions(ExchangeActions exchActs) {
+ DiscoveryDataClusterState clusterState = globalState;
+
+ if (clusterState.localBaselineAutoAdjustment()) {
+ BaselineTopology blt = clusterState.baselineTopology();
+
+ ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(
+ UUID.randomUUID(),
+ ctx.localNodeId(),
+ null,
+ true,
+ blt,
+ true,
+ System.currentTimeMillis()
+ );
+
+ StateChangeRequest stateChangeReq = new StateChangeRequest(
+ msg,
+ BaselineTopologyHistoryItem.fromBaseline(blt),
+ false,
+ null
+ );
+
+ if (exchActs == null)
+ exchActs = new ExchangeActions();
+
+ exchActs.stateChangeRequest(stateChangeReq);
+ }
+
+ return exchActs;
+ }
+
/** {@inheritDoc} */
@Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture, boolean hasMovingPartitions) {
// no-op
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java
index 571a3cc..a11a4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java
@@ -21,7 +21,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration;
import org.apache.ignite.internal.cluster.IgniteClusterImpl;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -29,7 +28,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
-import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustData.NULL_BASELINE_DATA;
import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
@@ -69,14 +68,14 @@ public class ChangeTopologyWatcher implements GridLocalEventListener {
ctx.log(BaselineAutoAdjustExecutor.class),
cluster,
ctx.getSystemExecutorService(),
- this::isBaselineAutoAdjustEnabled
+ this::isTopologyWatcherEnabled
), ctx.log(BaselineAutoAdjustScheduler.class));
this.discoveryMgr = ctx.discovery();
}
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
- if (!isBaselineAutoAdjustEnabled()) {
+ if (!isTopologyWatcherEnabled()) {
synchronized (this) {
lastBaselineData = NULL_BASELINE_DATA;
}
@@ -94,7 +93,7 @@ public class ChangeTopologyWatcher implements GridLocalEventListener {
if (isLocalNodeCoordinator(discoveryMgr)) {
exchangeManager.affinityReadyFuture(new AffinityTopologyVersion(discoEvt.topologyVersion()))
- .listen((IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>)future -> {
+ .listen(future -> {
if (future.error() != null)
return;
@@ -118,8 +117,9 @@ public class ChangeTopologyWatcher implements GridLocalEventListener {
/**
* @return {@code true} if auto-adjust baseline enabled.
*/
- private boolean isBaselineAutoAdjustEnabled() {
- return stateProcessor.clusterState().active() && baselineConfiguration.isBaselineAutoAdjustEnabled();
+ private boolean isTopologyWatcherEnabled() {
+ return stateProcessor.clusterState().active() && baselineConfiguration.isBaselineAutoAdjustEnabled()
+ && (CU.isPersistenceEnabled(cluster.ignite().configuration()) || cluster.baselineAutoAdjustTimeout() != 0L);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
index a357277..fa27959 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
@@ -62,6 +62,6 @@ class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessa
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(DistributedMetaStorageCasMessage.class, this);
+ return S.toString(DistributedMetaStorageCasMessage.class, this, super.toString());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index e806a32..f9bba3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -266,13 +266,14 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
* {@code readyForRead} and then {@code readyForWrite} if {@code active} flag happened to be {@code true}.
*/
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+
+ }
+
+ public void inMemoryReadyForRead() {
if (!isPersistenceEnabled) {
for (DistributedMetastorageLifecycleListener subscriber : isp.getDistributedMetastorageSubscribers())
subscriber.onReadyForRead(this);
}
-
- if (active)
- onActivate(ctx);
}
/**
@@ -282,6 +283,10 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
* {@code readyForWrite}.
*/
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+
+ }
+
+ public void inMemoryReadyForWrite() throws IgniteCheckedException {
if (!isPersistenceEnabled) {
lock.writeLock().lock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
index dafc4a9..c693004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -35,9 +36,11 @@ class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage {
private final IgniteUuid id = IgniteUuid.randomUuid();
/** Request ID. */
+ @GridToStringInclude
private final UUID reqId;
/** */
+ @GridToStringInclude
private final String key;
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 999b440..395705c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10976,12 +10976,15 @@ public abstract class IgniteUtils {
/**
* @return {@code true} if local node is coordinator.
*/
- public static boolean isLocalNodeCoordinator(GridDiscoveryManager discoveryManager) {
- DiscoverySpi spi = discoveryManager.getInjectedDiscoverySpi();
+ public static boolean isLocalNodeCoordinator(GridDiscoveryManager discoMgr) {
+ if (discoMgr.localNode().isClient() || discoMgr.localNode().isDaemon())
+ return false;
+
+ DiscoverySpi spi = discoMgr.getInjectedDiscoverySpi();
return spi instanceof TcpDiscoverySpi
? ((TcpDiscoverySpi)spi).isLocalNodeCoordinator()
- : F.eq(discoveryManager.localNode(), U.oldest(discoveryManager.aliveServerNodes(), null));
+ : F.eq(discoMgr.localNode(), U.oldest(discoMgr.aliveServerNodes(), null));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/IgniteDiscoveryThread.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/IgniteDiscoveryThread.java
index a3e376c..83e5a86 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/IgniteDiscoveryThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/IgniteDiscoveryThread.java
@@ -16,8 +16,12 @@
*/
package org.apache.ignite.spi.discovery;
+import org.apache.ignite.internal.util.worker.GridWorker;
+
/**
* Marker interface for discovery thread on cluster server node.
*/
public interface IgniteDiscoveryThread {
+ /** */
+ GridWorker worker();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d6a1a0c..85314b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -275,9 +275,6 @@ class ServerImpl extends TcpDiscoveryImpl {
private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
new ConcurrentHashMap<>();
- /** Last listener future. */
- private IgniteFuture<?> lastCustomEvtLsnrFut;
-
/**
* Maximum size of history of IDs of server nodes ever tried to join current topology (ever sent join request).
*/
@@ -568,7 +565,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
Collections.unmodifiableList(top));
- lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
+ lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null).get();
}
}
}
@@ -2254,20 +2251,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Wait for all the listeners from previous discovery message to be completed.
- */
- private void waitForLastCustomEventListenerFuture() {
- if (lastCustomEvtLsnrFut != null) {
- try {
- lastCustomEvtLsnrFut.get();
- }
- finally {
- lastCustomEvtLsnrFut = null;
- }
- }
- }
-
- /**
* Discovery messages history used for client reconnect.
*/
private class EnsuredMessageHistory {
@@ -4492,15 +4475,6 @@ class ServerImpl extends TcpDiscoveryImpl {
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
assert msg != null;
- blockingSectionBegin();
-
- try {
- waitForLastCustomEventListenerFuture();
- }
- finally {
- blockingSectionEnd();
- }
-
TcpDiscoveryNode node = msg.node();
assert node != null;
@@ -5999,8 +5973,6 @@ class ServerImpl extends TcpDiscoveryImpl {
blockingSectionEnd();
}
}
- else
- lastCustomEvtLsnrFut = fut;
if (msgObj.isMutable()) {
try {
@@ -7508,6 +7480,11 @@ class ServerImpl extends TcpDiscoveryImpl {
private MessageWorkerDiscoveryThread(GridWorker worker, IgniteLogger log) {
super(worker, log);
}
+
+ /** {@inheritDoc} */
+ @Override public GridWorker worker() {
+ return worker;
+ }
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 80262cd..965110a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -439,9 +439,11 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
int minorVer = 1;
- if (initiallyActive && persistenceEnabled()) {
+ if (initiallyActive) {
ignite(0).cluster().active(true);
+ awaitPartitionMapExchange();
+
minorVer++;
}
@@ -1389,6 +1391,11 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
* @param exp {@code True} if expect that cache is started on node.
*/
void checkCache(Ignite node, String cacheName, boolean exp) throws IgniteCheckedException {
+ GridTestUtils.waitForCondition(
+ () -> ((IgniteEx)node).context().cache().context().exchange().lastTopologyFuture() != null,
+ 1000
+ );
+
((IgniteEx)node).context().cache().context().exchange().lastTopologyFuture().get();
((IgniteEx)node).context().state().publicApiActiveState(true);
@@ -1406,7 +1413,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
*/
final void checkNoCaches(int nodes) {
for (int i = 0; i < nodes; i++) {
- grid(i).context().state().publicApiActiveState(true);
+ assertFalse(grid(i).context().state().publicApiActiveState(true));
GridCacheProcessor cache = ((IgniteEx)ignite(i)).context().cache();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
index 324b75e..55ca9fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
@@ -57,6 +57,8 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setConsistentId(igniteInstanceName);
+
cfg.setCommunicationSpi(new TestCommunicationSpi());
if (igniteInstanceName.equals(getTestIgniteInstanceName(NODES - 1)))
@@ -131,9 +133,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
IgniteEx ignite = grid(0);
- assertEquals(1L, ignite.localNode().order());
-
- ccfg.setNodeFilter(new TestFilterExcludeOldest());
+ ccfg.setNodeFilter(new TestFilterExcludeNode(ignite.localNode().consistentId()));
assertNotNull(ignite.getOrCreateCache(ccfg));
@@ -161,9 +161,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
IgniteEx ignite0 = grid(0);
IgniteEx ignite1 = grid(1);
- assertEquals(1L, ignite0.localNode().order());
-
- ccfg.setNodeFilter(new TestFilterExcludeOldest());
+ ccfg.setNodeFilter(new TestFilterExcludeNode(ignite0.localNode().consistentId()));
assertNotNull(ignite1.getOrCreateCache(ccfg));
@@ -192,15 +190,15 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
IgniteEx ignite = grid(1);
- assertEquals(2, ignite.localNode().order());
-
- ccfg.setNodeFilter(new TestFilterExcludeNode(2));
+ ccfg.setNodeFilter(new TestFilterExcludeNode(ignite.localNode().consistentId()));
assertNotNull(ignite.getOrCreateCache(ccfg));
awaitPartitionMapExchange();
checkCache(ccfg.getName());
+
+ ccfg.setNodeFilter(null);
}
/**
@@ -222,9 +220,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
IgniteEx ignite0 = grid(0);
IgniteEx ignite1 = grid(1);
- assertEquals(2L, ignite1.localNode().order());
-
- ccfg.setNodeFilter(new TestFilterExcludeNode(2));
+ ccfg.setNodeFilter(new TestFilterExcludeNode(ignite1.localNode().consistentId()));
assertNotNull(ignite0.getOrCreateCache(ccfg));
@@ -242,11 +238,9 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
public void testOldestChanged1() throws Exception {
IgniteEx ignite0 = grid(0);
- assertEquals(1L, ignite0.localNode().order());
-
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
- ccfg.setNodeFilter(new TestFilterExcludeOldest());
+ ccfg.setNodeFilter(new TestFilterExcludeNode(ignite0.localNode().consistentId()));
assertNotNull(ignite(1).getOrCreateCache(ccfg));
@@ -268,13 +262,9 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
*/
@Test
public void testOldestChanged2() throws Exception {
- IgniteEx ignite0 = grid(0);
-
- assertEquals(1L, ignite0.localNode().order());
-
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
- ccfg.setNodeFilter(new TestFilterIncludeNode(3));
+ ccfg.setNodeFilter(new TestFilterIncludeNode(ignite(2).cluster().localNode().consistentId()));
assertNotNull(ignite(1).getOrCreateCache(ccfg));
@@ -296,11 +286,9 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
public void testOldestChanged3() throws Exception {
IgniteEx ignite0 = grid(0);
- assertEquals(1L, ignite0.localNode().order());
-
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
- ccfg.setNodeFilter(new TestFilterIncludeNode(3));
+ ccfg.setNodeFilter(new TestFilterIncludeNode(ignite(2).cluster().localNode().consistentId()));
assertNotNull(ignite(1).getOrCreateCache(ccfg));
@@ -461,18 +449,18 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
*/
private static class TestFilterExcludeNode implements IgnitePredicate<ClusterNode> {
/** */
- private final long excludeOrder;
+ private Object excludeConsistentId;
/**
- * @param excludeOrder Node order to exclude.
+ * @param excludeConsistentId Node consistent to exclude.
*/
- public TestFilterExcludeNode(long excludeOrder) {
- this.excludeOrder = excludeOrder;
+ public TestFilterExcludeNode(Object excludeConsistentId) {
+ this.excludeConsistentId = excludeConsistentId;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
- return node.order() != excludeOrder;
+ return !node.consistentId().equals(excludeConsistentId);
}
}
@@ -481,18 +469,18 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
*/
private static class TestFilterIncludeNode implements IgnitePredicate<ClusterNode> {
/** */
- private final long includeOrder;
+ private final Object includeConsistentId;
/**
- * @param includeOrder Node order to exclude.
+ * @param includeConsistentId Node consistent to include.
*/
- public TestFilterIncludeNode(long includeOrder) {
- this.includeOrder = includeOrder;
+ public TestFilterIncludeNode(Object includeConsistentId) {
+ this.includeConsistentId = includeConsistentId;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
- return node.order() == includeOrder;
+ return node.consistentId().equals(includeConsistentId);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
index 3783f65..c6dfa58 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
@@ -26,17 +26,17 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.binary.BinaryObjectBuilder;
-import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 4292ba5..cc573e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -55,9 +55,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED;
@@ -834,7 +831,7 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- public void testNonPersistentCachesIgnoreBaselineTopology() throws Exception {
+ public void testNonPersistentCachesDontIgnoreBaselineTopology() throws Exception {
Ignite ig = startGrids(4);
ig.cluster().active(true);
@@ -849,15 +846,15 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
awaitPartitionMapExchange();
assertEquals(0, ig.affinity(persistentCache.getName()).allPartitions(newNode.cluster().localNode()).length);
- assertTrue(ig.affinity(inMemoryCache.getName()).allPartitions(newNode.cluster().localNode()).length > 0);
+ assertEquals(0, ig.affinity(inMemoryCache.getName()).allPartitions(newNode.cluster().localNode()).length);
}
/**
* @throws Exception If failed.
*/
@Test
- public void testNotMapNonBaselineTxPrimaryNodes() throws Exception {
- checkNotMapNonBaselineTxNodes(true, false);
+ public void testMapTxPrimaryNodes() throws Exception {
+ checkMapTxNodes(true, false);
}
/**
@@ -865,16 +862,16 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- public void testNotMapNonBaselineTxBackupNodes() throws Exception {
- checkNotMapNonBaselineTxNodes(false, false);
+ public void testMapTxBackupNodes() throws Exception {
+ checkMapTxNodes(false, false);
}
/**
* @throws Exception If failed.
*/
@Test
- public void testNotMapNonBaselineNearTxPrimaryNodes() throws Exception {
- checkNotMapNonBaselineTxNodes(true, true);
+ public void testMapNearTxPrimaryNodes() throws Exception {
+ checkMapTxNodes(true, true);
}
/**
@@ -882,8 +879,8 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- public void testNotMapNonBaselineNearTxBackupNodes() throws Exception {
- checkNotMapNonBaselineTxNodes(false, true);
+ public void testMapNearTxBackupNodes() throws Exception {
+ checkMapTxNodes(false, true);
}
/**
@@ -891,7 +888,7 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
* @param near Whether non-baseline nod is near node.
* @throws Exception If failed.
*/
- public void checkNotMapNonBaselineTxNodes(boolean primary, boolean near) throws Exception {
+ public void checkMapTxNodes(boolean primary, boolean near) throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true");
int bltNodesCnt = 3;
@@ -927,33 +924,7 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
assertEquals(0, nearIgnite.affinity(persistentCache.getName()).allPartitions(nonBltNode).length);
- assertTrue(nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length > 0);
-
- ClusterNode nearNode = nearIgnite.cluster().localNode();
-
- try (Transaction tx = nearIgnite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED)) {
- for (int i = 0; ; i++) {
- List<ClusterNode> nodes = new ArrayList<>(nearIgnite.affinity(inMemoryCache.getName())
- .mapKeyToPrimaryAndBackups(i));
-
- ClusterNode primaryNode = nodes.get(0);
-
- List<ClusterNode> backupNodes = nodes.subList(1, nodes.size());
-
- if (nonBltNode.equals(primaryNode) == primary) {
- if (backupNodes.contains(nonBltNode) != primary) {
- inMemoryCache.put(i, i);
-
- // add some persistent data in the same transaction
- for (int j = 0; j < 100; j++)
- persistentCache.put(j, j);
-
- break;
- }
- }
- }
- tx.commit();
- }
+ assertEquals(0, nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 6fc749d..face7e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
@@ -1672,11 +1673,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
*/
@Test
public void testInitCacheReceivedOnJoin() throws Exception {
- cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
- @Override public CacheConfiguration[] apply(String s) {
- return null;
- }
- };
+ cacheC = s -> null;
startServer(0, 1);
@@ -1684,11 +1681,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
checkAffinity(2, topVer(2, 1), true);
- cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
- @Override public CacheConfiguration[] apply(String s) {
- return new CacheConfiguration[]{cacheConfiguration()};
- }
- };
+ cacheC = s -> new CacheConfiguration[]{cacheConfiguration()};
startServer(2, 3);
@@ -1696,14 +1689,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
checkAffinity(3, topVer(3, 1), true);
- cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
- @Override public CacheConfiguration[] apply(String s) {
- CacheConfiguration ccfg = cacheConfiguration();
+ cacheC = s -> {
+ CacheConfiguration ccfg = cacheConfiguration();
- ccfg.setName(CACHE_NAME2);
+ ccfg.setName(CACHE_NAME2);
- return new CacheConfiguration[]{ccfg};
- }
+ return new CacheConfiguration[]{ccfg};
};
startClient(3, 4);
@@ -2605,7 +2596,8 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes,
AffinityTopologyVersion topVer,
boolean expIdeal,
- boolean checkPublicApi) throws Exception {
+ boolean checkPublicApi
+ ) throws Exception {
List<Ignite> nodes = G.allGrids();
Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
@@ -2697,11 +2689,14 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
if (!aff1.equals(aff2)) {
for (int i = 0; i < aff1.size(); i++) {
+ Collection<UUID> n1 = new ArrayList<>(F.nodeIds(aff1.get(i)));
+ Collection<UUID> n2 = new ArrayList<>(F.nodeIds(aff2.get(i)));
+
assertEquals("Wrong affinity [node=" + node.name() +
", topVer=" + topVer +
", cache=" + cacheName +
", part=" + i + ']',
- F.nodeIds(aff1.get(i)), F.nodeIds(aff2.get(i)));
+ n1, n2);
}
fail();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 9333a1c..9f8bca2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -33,7 +33,6 @@ import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustInMemoryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustInMemoryTest.java
new file mode 100644
index 0000000..25c4217
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustInMemoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.IgniteEx;
+import org.junit.Test;
+
+/** */
+public class BaselineAutoAdjustInMemoryTest extends BaselineAutoAdjustTest {
+ /** {@inheritDoc} */
+ @Override protected boolean isPersistent() {
+ return false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWithZeroTimeout() throws Exception {
+ startGrids(3);
+
+ startGrid(getConfiguration(UUID.randomUUID().toString()).setClientMode(true));
+
+ stopGrid(2);
+
+ assertEquals(2, grid(0).cluster().currentBaselineTopology().size());
+
+ startGrid(3);
+
+ assertEquals(3, grid(0).cluster().currentBaselineTopology().size());
+
+ startGrid(4);
+
+ assertEquals(4, grid(0).cluster().currentBaselineTopology().size());
+
+ stopGrid(1);
+
+ assertEquals(3, grid(0).cluster().currentBaselineTopology().size());
+
+ IgniteEx client = startGrid(getConfiguration(UUID.randomUUID().toString()).setClientMode(true));
+
+ assertEquals(3, grid(0).cluster().currentBaselineTopology().size());
+
+ stopGrid(client.name());
+
+ assertEquals(3, grid(0).cluster().currentBaselineTopology().size());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
index 22e6abe..e0fb95d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java
@@ -37,6 +37,11 @@ import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
/**
*
@@ -46,7 +51,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
/** */
private static final String TEST_NAME = "TEST_NAME";
/** */
- private static int AUTO_ADJUST_TIMEOUT = 5000;
+ private static int autoAdjustTimeout = 5000;
/**
* @throws Exception if failed.
@@ -55,9 +60,10 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
public void before() throws Exception {
stopAllGrids();
- cleanPersistenceDir();
+ if (isPersistent())
+ cleanPersistenceDir();
- AUTO_ADJUST_TIMEOUT = 5000;
+ autoAdjustTimeout = 5000;
}
/**
@@ -67,7 +73,8 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
public void after() throws Exception {
stopAllGrids();
- cleanPersistenceDir();
+ if (isPersistent())
+ cleanPersistenceDir();
}
/** {@inheritDoc} */
@@ -79,7 +86,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
storageCfg.getDefaultDataRegionConfiguration()
- .setPersistenceEnabled(true)
+ .setPersistenceEnabled(isPersistent())
.setMaxSize(500L * 1024 * 1024);
cfg.setDataStorageConfiguration(storageCfg);
@@ -87,6 +94,11 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
return cfg;
}
+ /** */
+ protected boolean isPersistent() {
+ return true;
+ }
+
/**
* @throws Exception if failed.
*/
@@ -96,7 +108,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
Set<Object> initBaseline = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -112,7 +124,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
assertTrue(waitForCondition(
() -> isCurrentBaselineFromOneNode(ignite0),
- AUTO_ADJUST_TIMEOUT * 2
+ autoAdjustTimeout * 2
));
}
@@ -125,7 +137,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
Set<Object> initBaseline = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -133,11 +145,11 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
stopGrid(1);
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
stopGrid(2);
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
Set<Object> twoNodeLeftBaseline = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -147,7 +159,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
assertTrue(waitForCondition(
() -> isCurrentBaselineFromOneNode(ignite0),
- AUTO_ADJUST_TIMEOUT * 2
+ autoAdjustTimeout * 2
));
}
@@ -160,7 +172,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
Set<Object> initBaseline = ignite0.cluster().currentBaselineTopology().stream()
.map(BaselineNode::consistentId)
@@ -168,11 +180,11 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
stopGrid(1);
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
stopGrid(0);
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
Ignite ignite2 = ignite(2);
@@ -184,7 +196,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
assertTrue(waitForCondition(
() -> isCurrentBaselineFromOneNode(ignite2),
- AUTO_ADJUST_TIMEOUT
+ autoAdjustTimeout
));
}
@@ -197,7 +209,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
assertTrue(isCurrentBaselineFromOneNode(ignite0));
@@ -207,7 +219,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
assertTrue(waitForCondition(
() -> ignite0.cluster().currentBaselineTopology().size() == 2,
- AUTO_ADJUST_TIMEOUT * 2
+ autoAdjustTimeout * 10
));
}
@@ -226,7 +238,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
*/
@Test
public void testBaselineAutoAdjustDisabledAfterGridHasLostPart() throws Exception {
- AUTO_ADJUST_TIMEOUT = 0;
+ autoAdjustTimeout = 0;
Ignite ignite0 = startGrids(2);
@@ -236,7 +248,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
.map(BaselineNode::consistentId)
.collect(Collectors.toSet());
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
IgniteCache<Object, Object> cache = ignite0.getOrCreateCache(new CacheConfiguration<>(TEST_NAME)
.setBackups(0)
@@ -254,7 +266,13 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
.map(BaselineNode::consistentId)
.collect(Collectors.toSet());
- assertEquals(initBaseline, baselineAfterNodeLeft);
+ if (isPersistent())
+ assertEquals(initBaseline, baselineAfterNodeLeft);
+ else {
+ assertThat(initBaseline, is(not(equalTo(baselineAfterNodeLeft))));
+
+ assertEquals(1, baselineAfterNodeLeft.size());
+ }
}
/**
@@ -266,7 +284,7 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
Collection<BaselineNode> baselineNodes = ignite0.cluster().currentBaselineTopology();
@@ -282,13 +300,15 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
*/
@Test
public void testBaselineAutoAdjustTriggeredAfterFirstEventRegardlessInitBaseline() throws Exception {
- AUTO_ADJUST_TIMEOUT = 3000;
+ assumeTrue(isPersistent());
+
+ autoAdjustTimeout = 3000;
Ignite ignite0 = startGrids(3);
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
assertTrue(ignite0.cluster().isBaselineAutoAdjustEnabled());
@@ -313,9 +333,8 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
Ignite finalIgnite = ignite0;
assertTrue(waitForCondition(
- () -> isCurrentBaselineFromOneNode(finalIgnite)
- ,
- AUTO_ADJUST_TIMEOUT * 2
+ () -> isCurrentBaselineFromOneNode(finalIgnite),
+ autoAdjustTimeout * 2
));
}
@@ -329,17 +348,17 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
ignite0.cluster().active(true);
- ignite0.cluster().baselineAutoAdjustTimeout(AUTO_ADJUST_TIMEOUT);
+ ignite0.cluster().baselineAutoAdjustTimeout(autoAdjustTimeout);
assertTrue(ignite0.cluster().isBaselineAutoAdjustEnabled());
stopGrid(1);
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
IgniteEx igniteClient = startGrid(getConfiguration(getTestIgniteInstanceName(2)).setClientMode(true));
- doSleep(AUTO_ADJUST_TIMEOUT / 2);
+ doSleep(autoAdjustTimeout / 2);
igniteClient.close();
@@ -351,6 +370,8 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest {
*/
@Test
public void testBaselineAutoAdjustDisableByDefaultBecauseNotNewCluster() throws Exception {
+ assumeTrue(isPersistent());
+
//It emulate working cluster before auto-adjust feature was available.
System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index 201626d..63f9c53 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -270,73 +270,6 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
* @throws Exception If failed.
*/
@Test
- public void testUnstableTopology() throws Exception {
- int cnt = 8;
-
- startGridsMultiThreaded(cnt);
-
- grid(0).cluster().active(true);
-
- stopGrid(0);
-
- startGrid(0);
-
- AtomicInteger gridIdxCntr = new AtomicInteger(0);
-
- AtomicBoolean stop = new AtomicBoolean();
-
- IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
- int gridIdx = gridIdxCntr.incrementAndGet();
-
- try {
- while (!stop.get()) {
- stopGrid(gridIdx, true);
-
- Thread.sleep(100L);
-
- startGrid(gridIdx);
-
- Thread.sleep(100L);
- }
- }
- catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }, cnt - 1);
-
- long start = System.currentTimeMillis();
-
- long duration = GridTestUtils.SF.applyLB(15_000, 5_000);
-
- try {
- for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
- metastorage(0).write(
- "key" + i, Integer.toString(ThreadLocalRandom.current().nextInt(1000))
- );
- }
- }
- finally {
- stop.set(true);
-
- fut.get();
- }
-
- awaitPartitionMapExchange();
-
- for (int i = 0; i < cnt; i++) {
- DistributedMetaStorage distributedMetastorage = metastorage(i);
-
- assertNull(U.field(distributedMetastorage, "startupExtras"));
- }
-
- for (int i = 1; i < cnt; i++)
- assertDistributedMetastoragesAreEqual(grid(0), grid(i));
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
@WithSystemProperty(key = IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, value = "0")
public void testWrongStartOrder1() throws Exception {
int cnt = 4;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index c50b2c3..88b9461 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -21,6 +21,8 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Comparator;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -28,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -390,6 +393,73 @@ public class DistributedMetaStorageTest extends GridCommonAbstractTest {
assertEquals("value2", metastorage(1).read("key2"));
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUnstableTopology() throws Exception {
+ int cnt = 8;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ AtomicInteger gridIdxCntr = new AtomicInteger(0);
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ int gridIdx = gridIdxCntr.incrementAndGet();
+
+ try {
+ while (!stop.get()) {
+ stopGrid(gridIdx, true);
+
+ Thread.sleep(100L);
+
+ startGrid(gridIdx);
+
+ Thread.sleep(100L);
+ }
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }, cnt - 1);
+
+ long start = System.currentTimeMillis();
+
+ long duration = GridTestUtils.SF.applyLB(15_000, 5_000);
+
+ try {
+ for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
+ metastorage(0).write(
+ "key" + i, Integer.toString(ThreadLocalRandom.current().nextInt(1000))
+ );
+ }
+ }
+ finally {
+ stop.set(true);
+
+ fut.get();
+ }
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < cnt; i++) {
+ DistributedMetaStorage distributedMetastorage = metastorage(i);
+
+ assertNull(U.field(distributedMetastorage, "startupExtras"));
+ }
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
/** */
protected IgniteEx startClient(int idx) throws Exception {
return startGrid(getConfiguration(getTestIgniteInstanceName(idx)).setClientMode(true));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOutsideBaselineTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOutsideBaselineTest.java
index b693c57..7200dd2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOutsideBaselineTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOutsideBaselineTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -111,6 +111,7 @@ public class ServiceDeploymentOutsideBaselineTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
public void testDeployOutsideBaselineNoPersistence() throws Exception {
checkDeploymentFromOutsideNode(false, false);
}
@@ -127,6 +128,7 @@ public class ServiceDeploymentOutsideBaselineTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
public void testDeployOutsideBaselineStaticNoPersistence() throws Exception {
checkDeploymentFromOutsideNode(false, true);
}
@@ -175,6 +177,7 @@ public class ServiceDeploymentOutsideBaselineTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
public void testDeployFromEachNodes() throws Exception {
checkDeployFromEachNodes(false, false);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
index 36ffe9e..0783fde 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -46,6 +48,7 @@ public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false")
public void testDiscoCacheReuseOnNodeJoin() throws Exception {
startGridsMultiThreaded(2);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index bc20229..ac01b0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -47,13 +47,13 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCa
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest;
import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest;
import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
+import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustInMemoryTest;
import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustTest;
import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest;
import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
import org.apache.ignite.spi.communication.GridCacheMessageSelfTest;
-
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -90,6 +90,7 @@ import org.junit.runners.Suite;
IgniteConcurrentEntryProcessorAccessStopTest.class,
GridUpdateNotifierSelfTest.class,
GridAddressResolverSelfTest.class,
+ BaselineAutoAdjustInMemoryTest.class,
BaselineAutoAdjustTest.class,
IgniteUpdateNotifierPerClusterSettingSelfTest.class,
GridLocalEventListenerSelfTest.class,
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 04f1586..06761a3 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -193,9 +193,6 @@ public class ZookeeperDiscoveryImpl {
/** */
private final ZookeeperDiscoveryStatistics stats;
- /** Last listener future. */
- private IgniteFuture<?> lastCustomEvtLsnrFut;
-
/**
* @param spi Discovery SPI.
* @param igniteInstanceName Instance name.
@@ -2807,8 +2804,6 @@ public class ZookeeperDiscoveryImpl {
private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData)
throws Exception
{
- waitForLastListenerFuture();
-
boolean evtProcessed = false;
for (int i = 0; i < evtData.joinedNodes.size(); i++) {
@@ -3487,8 +3482,6 @@ public class ZookeeperDiscoveryImpl {
if (msg != null && msg.isMutable())
fut.get();
- else
- lastCustomEvtLsnrFut = fut;
}
/**
@@ -4046,20 +4039,6 @@ public class ZookeeperDiscoveryImpl {
}
/**
- * Wait for all the listeners from previous discovery message to be completed.
- */
- private void waitForLastListenerFuture() {
- if (lastCustomEvtLsnrFut != null) {
- try {
- lastCustomEvtLsnrFut.get();
- }
- finally {
- lastCustomEvtLsnrFut = null;
- }
- }
- }
-
- /**
*
*/
private class ReconnectClosure implements Runnable {