You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/01/18 08:20:44 UTC
[ignite] branch master updated: IGNITE-8573 Save baseline
auto-adjust parameters to metastore - Fixes #5806.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 72fdb15 IGNITE-8573 Save baseline auto-adjust parameters to metastore - Fixes #5806.
72fdb15 is described below
commit 72fdb1517fe11dfc1be82076f5544077bf9ec829
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Fri Jan 18 11:20:01 2019 +0300
IGNITE-8573 Save baseline auto-adjust parameters to metastore - Fixes #5806.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../main/java/org/apache/ignite/IgniteCluster.java | 8 +
.../ignite/configuration/IgniteConfiguration.java | 87 ++++++-
.../apache/ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../org/apache/ignite/internal/IgniteKernal.java | 2 +
.../cluster/DistributedBaselineConfiguration.java | 109 ++++++++
.../internal/cluster/IgniteClusterAsyncImpl.java | 5 +
.../ignite/internal/cluster/IgniteClusterImpl.java | 50 ++--
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../distributed/DetachedPropertyException.java | 34 +++
.../distributed/DistributedBooleanProperty.java | 41 +++
.../distributed/DistributedComparableProperty.java | 63 +++++
.../DistributedConfigurationLifecycleListener.java | 29 +++
.../DistributedConfigurationProcessor.java | 281 +++++++++++++++++++++
.../distributed/DistributedLongProperty.java | 41 +++
.../distributed/DistributedProperty.java | 118 +++++++++
.../distributed/DistributedPropertyDispatcher.java | 53 ++++
.../ReadOnlyDistributedMetaStorageBridge.java | 2 +-
.../platform/utils/PlatformConfigurationUtils.java | 12 +
.../GridInternalSubscriptionProcessor.java | 32 ++-
.../util/lang/IgniteThrowableBiConsumer.java | 38 +++
.../org.apache.ignite.plugin.PluginProvider | 1 +
.../distributed/DistributedConfigurationTest.java | 242 ++++++++++++++++++
.../TestDistibutedConfigurationPlugin.java | 119 +++++++++
.../junits/multijvm/IgniteClusterProcessProxy.java | 6 +
.../ApiParity/ClusterParityTest.cs | 3 +-
.../IgniteConfigurationTest.cs | 4 +
.../Apache.Ignite.Core/IgniteConfiguration.cs | 60 +++++
.../IgniteConfigurationSection.xsd | 15 ++
29 files changed, 1443 insertions(+), 38 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index fc0e81b..5d6b9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterStartNodeResult;
+import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteFuture;
@@ -530,4 +531,11 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
* @see #enableWal(String)
*/
public boolean isWalEnabled(String cacheName);
+
+ /**
+ * All distributed properties of baseline.
+ *
+ * @return Distributed baseline configuration.
+ */
+ public DistributedBaselineConfiguration baselineConfiguration();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 9498c5b..00178e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -17,11 +17,6 @@
package org.apache.ignite.configuration;
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.util.Map;
-import java.util.UUID;
-import java.util.zip.Deflater;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
@@ -29,6 +24,11 @@ import javax.cache.integration.CacheLoader;
import javax.cache.processor.EntryProcessor;
import javax.management.MBeanServer;
import javax.net.ssl.SSLContext;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.UUID;
+import java.util.zip.Deflater;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
@@ -225,6 +225,18 @@ public class IgniteConfiguration {
/** Default time interval between MVCC vacuum runs in milliseconds. */
public static final long DFLT_MVCC_VACUUM_FREQUENCY = 5000;
+ /** Default of initial value of manual baseline control or auto adjusting baseline. */
+ public static final boolean DFLT_INIT_BASELINE_AUTO_ADJUST_ENABLED = false;
+
+ /**
+ * Initial value of time which we would wait before the actual topology change since last discovery event(node
+ * join/exit).
+ */
+ public static final long DFLT_INIT_BASELINE_AUTO_ADJUST_TIMEOUT = 0;
+
+ /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */
+ public static final long DFLT_INIT_BASELINE_AUTO_ADJUST_MAX_TIMEOUT = 0;
+
/** Optional local Ignite instance name. */
private String igniteInstanceName;
@@ -524,6 +536,18 @@ public class IgniteConfiguration {
/** SQL schemas to be created on node start. */
private String[] sqlSchemas;
+ /** Initial value of manual baseline control or auto adjusting baseline. */
+ private boolean initBaselineAutoAdjustEnabled = DFLT_INIT_BASELINE_AUTO_ADJUST_ENABLED;
+
+ /**
+ * Initial value of time which we would wait before the actual topology change since last discovery event(node
+ * join/exit).
+ */
+ private long initBaselineAutoAdjustTimeout = DFLT_INIT_BASELINE_AUTO_ADJUST_TIMEOUT;
+
+ /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */
+ private long initBaselineAutoAdjustMaxTimeout = DFLT_INIT_BASELINE_AUTO_ADJUST_MAX_TIMEOUT;
+
/**
* Creates valid grid configuration with all default values.
*/
@@ -3167,6 +3191,59 @@ public class IgniteConfiguration {
return this;
}
+ /**
+ * Gets initial value of manual baseline control or auto adjusting baseline. This value would be used only if it
+ * have not been changed earlier in real time.
+ *
+ * @return {@code true} if auto adjusting baseline enabled.
+ */
+ public boolean isInitBaselineAutoAdjustEnabled() {
+ return initBaselineAutoAdjustEnabled;
+ }
+
+ /**
+ * Sets initial value of manual baseline control or auto adjusting baseline.
+ */
+ public void setInitBaselineAutoAdjustEnabled(boolean initBaselineAutoAdjustEnabled) {
+ this.initBaselineAutoAdjustEnabled = initBaselineAutoAdjustEnabled;
+ }
+
+ /**
+ * Gets initial value of time which we would wait before the actual topology change. But it would be reset if new
+ * discovery event happened. (node join/exit). This value would be used only if it have not been changed earlier in
+ * real time.
+ *
+ * @return Timeout of wait the actual topology change.
+ */
+ public long getInitBaselineAutoAdjustTimeout() {
+ return initBaselineAutoAdjustTimeout;
+ }
+
+ /**
+ * Sets initial value of time which we would wait before the actual topology change.
+ */
+ public void setInitBaselineAutoAdjustTimeout(long initBaselineAutoAdjustTimeout) {
+ this.initBaselineAutoAdjustTimeout = initBaselineAutoAdjustTimeout;
+ }
+
+ /**
+ * Gets initial value of time which we would wait from the first discovery event in the chain. If we achieved it
+ * than we would change BLAT right away (no matter were another node join/exit happened or not). This value would be
+ * used only if it have not been changed earlier in real time.
+ *
+ * @return Timeout of wait the actual topology change.
+ */
+ public long getInitBaselineAutoAdjustMaxTimeout() {
+ return initBaselineAutoAdjustMaxTimeout;
+ }
+
+ /**
+ * Sets initial value of time which we would wait from the first discovery event in the chain.
+ */
+ public void setInitBaselineAutoAdjustMaxTimeout(long initBaselineAutoAdjustMaxTimeout) {
+ this.initBaselineAutoAdjustMaxTimeout = initBaselineAutoAdjustMaxTimeout;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 9651290..744f858 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -214,6 +215,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public DistributedMetaStorage distributedMetastorage();
/**
+ * Gets distributed configuration processor.
+ *
+ * @return Distributed configuration processor.
+ */
+ public DistributedConfigurationProcessor distributedConfiguration();
+
+ /**
* Gets task session processor.
*
* @return Session processor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index cc18d49..85e02f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
@@ -223,6 +224,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringInclude
private DistributedMetaStorage distributedMetastorage;
+ /** Global metastorage. */
+ @GridToStringInclude
+ private DistributedConfigurationProcessor distributedConfigurationProcessor;
+
/** */
@GridToStringInclude
private GridTaskSessionProcessor sesProc;
@@ -609,6 +614,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
stateProc = (GridClusterStateProcessor)comp;
else if (comp instanceof DistributedMetaStorage)
distributedMetastorage = (DistributedMetaStorage)comp;
+ else if (comp instanceof DistributedConfigurationProcessor)
+ distributedConfigurationProcessor = (DistributedConfigurationProcessor)comp;
else if (comp instanceof GridTaskSessionProcessor)
sesProc = (GridTaskSessionProcessor)comp;
else if (comp instanceof GridPortProcessor)
@@ -764,6 +771,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public DistributedConfigurationProcessor distributedConfiguration() {
+ return distributedConfigurationProcessor;
+ }
+
+ /** {@inheritDoc} */
@Override public GridTaskSessionProcessor session() {
return sesProc;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 30af4f4..e427810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -149,6 +149,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
@@ -1037,6 +1038,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createComponent(PlatformProcessor.class, ctx));
startProcessor(new GridMarshallerMappingProcessor(ctx));
startProcessor(new DistributedMetaStorageImpl(ctx));
+ startProcessor(new DistributedConfigurationProcessor(ctx));
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
new file mode 100644
index 0000000..99d8929
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedProperty;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedProperty;
+
+/**
+ * Distributed baseline configuration.
+ */
+public class DistributedBaselineConfiguration {
+ /** Value of manual baseline control or auto adjusting baseline. */
+ private DistributedBooleanProperty baselineAutoAdjustEnabled;
+
+ /**
+ * Value of time which we would wait before the actual topology change since last discovery event(node join/exit).
+ */
+ private DistributedLongProperty baselineAutoAdjustTimeout;
+
+ /** Value of time which we would wait from the first discovery event in the chain(node join/exit). */
+ private DistributedLongProperty baselineAutoAdjustMaxTimeout;
+
+ /**
+ * @param cfg Static config.
+ * @param isp Subscription processor.
+ */
+ public DistributedBaselineConfiguration(IgniteConfiguration cfg, GridInternalSubscriptionProcessor isp) {
+ baselineAutoAdjustEnabled = detachedProperty("baselineAutoAdjustEnabled", cfg.isInitBaselineAutoAdjustEnabled());
+ baselineAutoAdjustTimeout = detachedProperty("baselineAutoAdjustTimeout", cfg.getInitBaselineAutoAdjustTimeout());
+ baselineAutoAdjustMaxTimeout = detachedProperty("baselineAutoAdjustMaxTimeout", cfg.getInitBaselineAutoAdjustMaxTimeout());
+
+ isp.registerDistributedConfigurationListener(
+ dispatcher -> {
+ dispatcher.registerProperty(baselineAutoAdjustEnabled);
+ dispatcher.registerProperty(baselineAutoAdjustTimeout);
+ dispatcher.registerProperty(baselineAutoAdjustMaxTimeout);
+ }
+ );
+ }
+
+ /**
+ * @return Value of manual baseline control or auto adjusting baseline.
+ */
+ public boolean isBaselineAutoAdjustEnabled() {
+ return baselineAutoAdjustEnabled.value();
+ }
+
+ /**
+ * @param baselineAutoAdjustEnabled Value of manual baseline control or auto adjusting baseline.
+ * @throws IgniteCheckedException if failed.
+ */
+ public void setBaselineAutoAdjustEnabled(boolean baselineAutoAdjustEnabled) throws IgniteCheckedException {
+ this.baselineAutoAdjustEnabled.propagate(baselineAutoAdjustEnabled);
+ }
+
+ /**
+ * @return Value of time which we would wait before the actual topology change since last discovery event(node
+ * join/exit).
+ */
+ public long getBaselineAutoAdjustTimeout() {
+ return baselineAutoAdjustTimeout.value();
+ }
+
+ /**
+ * @param baselineAutoAdjustTimeout Value of time which we would wait before the actual topology change since last
+ * discovery event(node join/exit).
+ * @throws IgniteCheckedException If failed.
+ */
+ public void setBaselineAutoAdjustTimeout(long baselineAutoAdjustTimeout) throws IgniteCheckedException {
+ this.baselineAutoAdjustTimeout.propagate(baselineAutoAdjustTimeout);
+ }
+
+ /**
+ * @return Value of time which we would wait from the first discovery event in the chain(node join/exit).
+ */
+ public long getBaselineAutoAdjustMaxTimeout() {
+ return baselineAutoAdjustMaxTimeout.value();
+ }
+
+ /**
+ * @param baselineAutoAdjustMaxTimeout Value of time which we would wait from the first discovery event in the
+ * chain(node join/exit).
+ * @throws IgniteCheckedException If failed.
+ */
+ public void setBaselineAutoAdjustMaxTimeout(long baselineAutoAdjustMaxTimeout) throws IgniteCheckedException {
+ this.baselineAutoAdjustMaxTimeout.propagate(baselineAutoAdjustMaxTimeout);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index d79710d..60eec0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -361,6 +361,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
}
/** {@inheritDoc} */
+ @Override public DistributedBaselineConfiguration baselineConfiguration() {
+ return cluster.baselineConfiguration();
+ }
+
+ /** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cluster = (IgniteClusterImpl)in.readObject();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index b755258..2f5e63a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -93,6 +93,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
/** Minimal IgniteProductVersion supporting BaselineTopology */
private static final IgniteProductVersion MIN_BLT_SUPPORTING_VER = IgniteProductVersion.fromString("2.4.0");
+ /** Distributed baseline configuration. */
+ private DistributedBaselineConfiguration distributedBaselineConfiguration;
+
/**
* Required by {@link Externalizable}.
*/
@@ -109,6 +112,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
cfg = ctx.config();
nodeLoc = new ClusterNodeLocalMapImpl(ctx);
+
+ distributedBaselineConfiguration = new DistributedBaselineConfiguration(cfg, ctx.internalSubscriptionProcessor());
}
/** {@inheritDoc} */
@@ -191,8 +196,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
boolean restart,
int timeout,
int maxConn)
- throws IgniteException
- {
+ throws IgniteException {
try {
return startNodesAsync0(file, restart, timeout, maxConn).get();
}
@@ -213,8 +217,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
boolean restart,
int timeout,
int maxConn)
- throws IgniteException
- {
+ throws IgniteException {
try {
return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get();
}
@@ -371,8 +374,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/**
- * Verifies all nodes in current cluster topology support BaselineTopology feature
- * so compatibilityMode flag is enabled to reset.
+ * Verifies all nodes in current cluster topology support BaselineTopology feature so compatibilityMode flag is
+ * enabled to reset.
*
* @param discoCache
*/
@@ -419,7 +422,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/** */
- @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(Collection<? extends BaselineNode> newBlt) {
+ @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(
+ Collection<? extends BaselineNode> newBlt) {
BaselineTopology blt = ctx.state().clusterState().baselineTopology();
Set<Object> bltConsIds;
@@ -600,10 +604,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
* @see IgniteCluster#startNodes(java.io.File, boolean, int, int)
*/
IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file,
- boolean restart,
- int timeout,
- int maxConn)
- {
+ boolean restart,
+ int timeout,
+ int maxConn) {
A.notNull(file, "file");
A.ensure(file.exists(), "file doesn't exist.");
A.ensure(file.isFile(), "file is a directory.");
@@ -632,8 +635,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@Nullable Map<String, Object> dflts,
boolean restart,
int timeout,
- int maxConn)
- {
+ int maxConn) {
A.notNull(hosts, "hosts");
guard();
@@ -709,7 +711,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
Collections.<ClusterStartNodeResult>emptyList());
// Exceeding max line width for readability.
- GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut =
+ GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut =
new GridCompoundFuture<>(CU.<ClusterStartNodeResult>objectsReducer());
AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
@@ -733,12 +735,10 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/**
- * Gets the all grid nodes that reside on the same physical computer as local grid node.
- * Local grid node is excluded.
- * <p>
- * Detection of the same physical computer is based on comparing set of network interface MACs.
- * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
- * physical computer.
+ * Gets the all grid nodes that reside on the same physical computer as local grid node. Local grid node is
+ * excluded. <p> Detection of the same physical computer is based on comparing set of network interface MACs. If two
+ * nodes have the same set of MACs, Ignite considers these nodes running on the same physical computer.
+ *
* @return Grid nodes that reside on the same physical computer as local grid node.
*/
private Collection<ClusterNode> neighbors() {
@@ -766,9 +766,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
*/
private boolean runNextNodeCallable(final ConcurrentLinkedQueue<StartNodeCallable> queue,
final GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>>
- comp,
- final AtomicInteger cnt)
- {
+ comp,
+ final AtomicInteger cnt) {
StartNodeCallable call = queue.poll();
if (call == null)
@@ -825,6 +824,11 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/** {@inheritDoc} */
+ @Override public DistributedBaselineConfiguration baselineConfiguration() {
+ return distributedBaselineConfiguration;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return "IgniteCluster [igniteInstanceName=" + ctx.igniteInstanceName() + ']';
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 72f2f3b..0396c3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -307,6 +308,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
+ @Override public DistributedConfigurationProcessor distributedConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public GridTaskSessionProcessor session() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java
new file mode 100644
index 0000000..bcd9ae9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Exception of distributed property still have not been attached to the processor.
+ */
+public class DetachedPropertyException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+ /**
+ * @param name Name of detached property.
+ */
+ public DetachedPropertyException(String name) {
+ super("Property '" + name + "' is detached from the processor.");
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java
new file mode 100644
index 0000000..adf69c1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer;
+
+/**
+ * Implementation of {@link DistributedProperty} for {@link Boolean}.
+ */
+public class DistributedBooleanProperty extends DistributedProperty<Boolean> {
+
+ /** {@inheritDoc} */
+ DistributedBooleanProperty(String name, Boolean val) {
+ super(name, val);
+ }
+
+ /**
+ * @param name Name of property.
+ * @param initVal Initial initVal of property.
+ * @return Property detached from processor.(Distributed updating are not accessable).
+ */
+ public static DistributedBooleanProperty detachedProperty(String name, Boolean initVal) {
+ return new DistributedBooleanProperty(name, initVal);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java
new file mode 100644
index 0000000..2207edf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer;
+
+/**
+ * Implementation of {@link DistributedProperty} for {@link Comparable}.
+ */
+public class DistributedComparableProperty<T extends Comparable<T> & Serializable> extends DistributedProperty<T> {
+
+ /** {@inheritDoc} */
+ DistributedComparableProperty(String name, T initVal) {
+ super(name, initVal);
+ }
+
+ /** */
+ public boolean equalTo(T other) {
+ return Objects.equals(val, other);
+ }
+
+ /** */
+ public boolean nonEqualTo(T other) {
+ return !Objects.equals(val, other);
+ }
+
+ /** */
+ public boolean lessThan(T other) {
+ return val.compareTo(other) < 0;
+ }
+
+ /** */
+ public boolean lessOrEqualTo(T other) {
+ return val.compareTo(other) <= 0;
+ }
+
+ /** */
+ public boolean greaterThan(T other) {
+ return val.compareTo(other) > 0;
+ }
+
+ /** */
+ public boolean greaterOrEqualTo(T other) {
+ return val.compareTo(other) >= 0;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java
new file mode 100644
index 0000000..cbdda67
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+/**
+ * Lifecycle listener for distributed configuration.
+ */
+@FunctionalInterface
+public interface DistributedConfigurationLifecycleListener {
+ /**
+ * Notify about processor ready to register properties.
+ */
+ void onReadyToRegister(DistributedPropertyDispatcher dispatcher);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
new file mode 100644
index 0000000..9c8116d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer;
+
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.REGISTER;
+
+/**
+ * Processor of distributed configuration.
+ *
+ * This class control lifecycle of actualization {@link DistributedProperty} across whole cluster.
+ */
+public class DistributedConfigurationProcessor extends GridProcessorAdapter implements DistributedPropertyDispatcher {
+ /** Prefix of key for distributed meta storage. */
+ private static final String DIST_CONF_PREFIX = "distrConf";
+
+ /** Properties storage. */
+ private final Map<String, DistributedProperty> props = new ConcurrentHashMap<>();
+
+ /** Global metastorage. */
+ private volatile DistributedMetaStorage distributedMetastorage;
+
+ /** Max allowed action. All action with less ordinal than this also allowed. */
+ private volatile AllowableAction allowableAction = REGISTER;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public DistributedConfigurationProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
+
+ isp.registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() {
+ @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+ distributedMetastorage = ctx.distributedMetastorage();
+
+ //Listener for handling of cluster wide change of specific properties. Do local update.
+ distributedMetastorage.listen(
+ (key) -> key.startsWith(DIST_CONF_PREFIX),
+ (String key, Serializable oldVal, Serializable newVal) -> {
+ DistributedProperty prop = props.get(toPropertyKey(key));
+
+ if (prop != null)
+ prop.localUpdate(newVal);
+ }
+ );
+
+ //Switch to actualize action and actualize already registered properties.
+ switchCurrentActionTo(ACTUALIZE);
+
+ //Register and actualize properties waited for this service.
+ isp.getDistributedConfigurationListeners()
+ .forEach(listener -> listener.onReadyToRegister(DistributedConfigurationProcessor.this));
+
+ }
+
+ @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+ //Switch to cluster wide update action and do it on already registered properties.
+ switchCurrentActionTo(CLUSTER_WIDE_UPDATE);
+ }
+ });
+ }
+
+ /**
+ * Switching current action to given action and do all actions from old action to new one.
+ *
+ * @param to New action for switching on.
+ */
+ private synchronized void switchCurrentActionTo(AllowableAction to) {
+ AllowableAction oldAct = allowableAction;
+
+ assert oldAct.ordinal() <= to.ordinal() : "Current action : " + oldAct + ", new action : " + to;
+
+ allowableAction = to;
+
+ for (AllowableAction action : AllowableAction.values()) {
+ if (action.ordinal() > oldAct.ordinal())
+ props.values().forEach(prop -> doAction(action, prop));
+
+ if (action == to)
+ break;
+ }
+ }
+
+ /**
+ * @param propKey Key of specific property.
+ * @return Property key for meta storage.
+ */
+ private static String toMetaStorageKey(String propKey) {
+ return DIST_CONF_PREFIX + propKey;
+ }
+
+ /**
+ * @param metaStorageKey Key from meta storage.
+ * @return Original property key.
+ */
+ private static String toPropertyKey(String metaStorageKey) {
+ return metaStorageKey.substring(DIST_CONF_PREFIX.length());
+ }
+
+ /**
+ * Register property to processor and attach it if it possible.
+ *
+ * @param prop Property to attach to processor.
+ * @param <T> Type of property value.
+ */
+ @Override public <T extends DistributedProperty> T registerProperty(T prop) {
+ doAllAllowableActions(prop);
+
+ return prop;
+ }
+
+ /**
+ * Get registered property.
+ *
+ * @param <T> Type of property value.
+ */
+ public <T extends DistributedProperty> T getProperty(String name) {
+ return (T)props.get(name);
+ }
+
+ /**
+ * Create and attach new long property.
+ *
+ * @param name Name of property.
+ * @param initVal Initial value of property.
+ * @return Attached new property.
+ */
+ @Override public DistributedLongProperty registerLong(String name, Long initVal) {
+ return registerProperty(new DistributedLongProperty(name, initVal));
+ }
+
+ /**
+ * Create and attach new boolean property.
+ *
+ * @param name Name of property.
+ * @param initVal Initial value of property.
+ * @return Attached new property.
+ */
+ @Override public DistributedBooleanProperty registerBoolean(String name,
+ Boolean initVal) {
+ return registerProperty(new DistributedBooleanProperty(name, initVal));
+ }
+
+ /**
+ * Execute all allowable actions until current action on given property.
+ *
+ * @param prop Property which action should be executed on.
+ */
+ private void doAllAllowableActions(DistributedProperty prop) {
+ for (AllowableAction action : AllowableAction.values()) {
+ doAction(action, prop);
+
+ if (action == allowableAction)
+ break;
+ }
+ }
+
+ /**
+ * Do one given action on given property.
+ *
+ * @param act Action to execute.
+ * @param prop Property which action should be execute on.
+ */
+ private void doAction(AllowableAction act, DistributedProperty prop) {
+ switch (act) {
+ case REGISTER:
+ doRegister(prop);
+ break;
+ case ACTUALIZE:
+ doActualize(prop);
+ break;
+ case CLUSTER_WIDE_UPDATE:
+ doClusterWideUpdate(prop);
+ break;
+ }
+ }
+
+ /**
+ * Do register action on given property.
+ *
+ * Bind property with this processor for furthter actualizing.
+ *
+ * @param prop Property which action should be execute on.
+ */
+ private void doRegister(DistributedProperty prop) {
+ if (props.containsKey(prop.getName()))
+ throw new IllegalArgumentException("Property already exists : " + prop.getName());
+
+ props.put(prop.getName(), prop);
+
+ prop.onAttached();
+ }
+
+ /**
+ * Do actualize action on given property.
+ *
+ * Read actual value from metastore and set it to local property.
+ *
+ * @param prop Property which action should be execute on.
+ */
+ private void doActualize(DistributedProperty prop) {
+ Serializable readVal = null;
+ try {
+ readVal = distributedMetastorage.read(toMetaStorageKey(prop.getName()));
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Can not read value of property '" + prop.getName() + "'", e);
+ }
+
+ if (readVal != null)
+ prop.localUpdate(readVal);
+ }
+
+ /**
+ * Do cluster wide action on given property.
+ *
+ * Set closure for cluster wide update action to given property.
+ *
+ * @param prop Property which action should be execute on.
+ */
+ private void doClusterWideUpdate(DistributedProperty prop) {
+ prop.onReadyForUpdate(
+ (IgniteThrowableBiConsumer<String, Serializable>)(key, value) ->
+ distributedMetastorage.write(toMetaStorageKey(key), value)
+ );
+ }
+
+ /**
+ * This enum determinate what is action allowable for proccessor in current moment.
+ *
+ * Order is important. Each next action allowable all previous actions. Current action can be changed only from
+ * previous to next .
+ */
+ enum AllowableAction {
+ /**
+ * Only registration allowed. Actualization property from metastore and cluster wide update aren't allowed.
+ */
+ REGISTER,
+ /**
+ * Registration and actualization property from metastore are allowed. Cluster wide update isn't allowed.
+ */
+ ACTUALIZE,
+ /**
+ * All of below are allowed.
+ */
+ CLUSTER_WIDE_UPDATE;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java
new file mode 100644
index 0000000..c25b841
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer;
+
+/**
+ * Implementation of {@link DistributedProperty} for {@link Long}.
+ */
+public class DistributedLongProperty extends DistributedComparableProperty<Long> {
+
+ /** {@inheritDoc} */
+ DistributedLongProperty(String name, Long initVal) {
+ super(name, initVal);
+ }
+
+ /**
+ * @param name Name of property.
+ * @param initVal Initial initVal of property.
+ * @return Property detached from processor.(Distributed updating are not accessable).
+ */
+ public static DistributedLongProperty detachedProperty(String name, Long initVal) {
+ return new DistributedLongProperty(name, initVal);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java
new file mode 100644
index 0000000..af67b55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Wrapper of some serializable property providing ability of change this value across whole cluster.
+ */
+public class DistributedProperty<T extends Serializable> {
+ /** Name of property. */
+ private final String name;
+ /** Property value. */
+ protected volatile T val;
+ /** Sign of attachment to the processor. */
+ private volatile boolean attached = false;
+ /**
+ * Specific consumer for update value in cluster. It is null when property doesn't ready to update value on cluster
+ * wide.
+ */
+ @GridToStringExclude
+ private volatile IgniteThrowableBiConsumer<String, Serializable> clusterWideUpdater;
+
+ /**
+ * @param name Name of property.
+ * @param initVal Initial value of property.
+ */
+ public DistributedProperty(String name, T initVal) {
+ this.val = initVal;
+ this.name = name;
+ }
+
+ /**
+ * Change value across whole cluster.
+ *
+ * @param newVal Value which this property should be changed on.
+ * @return {@code true} if value was successfully updated and {@code false} if cluster wide update have not
+ * permitted yet.
+ * @throws DetachedPropertyException If this property have not been attached to processor yet, please call {@link
+ * DistributedConfigurationProcessor#registerProperty(DistributedProperty)} before this method.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public boolean propagate(T newVal) throws IgniteCheckedException {
+ if (!attached)
+ throw new DetachedPropertyException(name);
+
+ if (clusterWideUpdater == null)
+ return false;
+
+ clusterWideUpdater.accept(name, newVal);
+
+ return true;
+ }
+
+ /**
+ * @return Current property value.
+ */
+ public T value() {
+ return val;
+ }
+
+ /**
+ * @return Name of property.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * This property have been attached to processor.
+ */
+ void onAttached() {
+ attached = true;
+ }
+
+ /**
+ * On this property ready to be update on cluster wide.
+ *
+ * @param updater Consumer for update value across cluster.
+ */
+ void onReadyForUpdate(@NotNull IgniteThrowableBiConsumer<String, Serializable> updater) {
+ this.clusterWideUpdater = updater;
+ }
+
+ /**
+ * Update only local value without updating remote cluster.
+ *
+ * @param newVal New value.
+ */
+ void localUpdate(Serializable newVal) {
+ val = (T)newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedProperty.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java
new file mode 100644
index 0000000..3178f75
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Dispatcher of distributed properties.
+ *
+ * Hold of all register properties of distributed configuration.
+ */
+public interface DistributedPropertyDispatcher {
+ /**
+ * Attach already created property.
+ *
+ * @param prop Property to attach to processor.
+ * @param <T> Type of property value.
+ */
+ public <T extends DistributedProperty> T registerProperty(T prop);
+
+ /**
+ * Create and attach new long property.
+ *
+ * @param name Name of property.
+ * @param initVal Initial value of property.
+ * @return Attached new property.
+ */
+ public DistributedLongProperty registerLong(String name, Long initVal);
+
+ /**
+ * Create and attach new boolean property.
+ *
+ * @param name Name of property.
+ * @param initVal Initial value of property.
+ * @return Attached new property.
+ */
+ public DistributedBooleanProperty registerBoolean(String name, Boolean initVal);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
index 84c955d..ae5261b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
@@ -42,7 +42,7 @@ class ReadOnlyDistributedMetaStorageBridge implements DistributedMetaStorageBrid
Comparator.comparing(item -> item.key);
/** */
- private DistributedMetaStorageHistoryItem[] locFullData;
+ private DistributedMetaStorageHistoryItem[] locFullData = EMPTY_ARRAY;
/** */
private DistributedMetaStorageVersion ver;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index fada9d1..11ff87e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -661,6 +661,12 @@ public class PlatformConfigurationUtils {
cfg.setMvccVacuumThreadCount(in.readInt());
if (in.readBoolean())
cfg.setSystemWorkerBlockedTimeout(in.readLong());
+ if (in.readBoolean())
+ cfg.setInitBaselineAutoAdjustEnabled(in.readBoolean());
+ if (in.readBoolean())
+ cfg.setInitBaselineAutoAdjustTimeout(in.readLong());
+ if (in.readBoolean())
+ cfg.setInitBaselineAutoAdjustMaxTimeout(in.readLong());
int sqlSchemasCnt = in.readInt();
@@ -1250,6 +1256,12 @@ public class PlatformConfigurationUtils {
} else {
w.writeBoolean(false);
}
+ w.writeBoolean(true);
+ w.writeBoolean(cfg.isInitBaselineAutoAdjustEnabled());
+ w.writeBoolean(true);
+ w.writeLong(cfg.getInitBaselineAutoAdjustTimeout());
+ w.writeBoolean(true);
+ w.writeLong(cfg.getInitBaselineAutoAdjustMaxTimeout());
if (cfg.getSqlSchemas() == null)
w.writeInt(-1);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
index 5e48547..7f89ed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
@@ -22,12 +22,14 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.jetbrains.annotations.NotNull;
+import static java.util.Objects.requireNonNull;
+
/**
- * Processor enables grid components to register listeners for events
- * generated by other components on local node.
+ * Processor enables grid components to register listeners for events generated by other components on local node.
*
* It starts very first during node startup procedure so any components could use it.
*
@@ -43,6 +45,10 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
/** */
private final List<DatabaseLifecycleListener> dbListeners = new ArrayList<>();
+ /**
+ * Listeners of distributed configuration controlled by {@link org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor}.
+ */
+ private List<DistributedConfigurationLifecycleListener> distributedConfigurationListeners = new ArrayList<>();
/**
* @param ctx Kernal context.
@@ -53,8 +59,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
/** */
public void registerMetastorageListener(@NotNull MetastorageLifecycleListener metastorageListener) {
- if (metastorageListener == null)
- throw new NullPointerException("Metastorage subscriber should be not-null.");
+ requireNonNull(metastorageListener, "Metastorage subscriber should be not-null.");
metastorageListeners.add(metastorageListener);
}
@@ -66,8 +71,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
/** */
public void registerDistributedMetastorageListener(@NotNull DistributedMetastorageLifecycleListener lsnr) {
- if (lsnr == null)
- throw new NullPointerException("Global metastorage subscriber should be not-null.");
+ requireNonNull(lsnr, "Global metastorage subscriber should be not-null.");
distributedMetastorageListeners.add(lsnr);
}
@@ -79,8 +83,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
/** */
public void registerDatabaseListener(@NotNull DatabaseLifecycleListener databaseListener) {
- if (databaseListener == null)
- throw new NullPointerException("Database subscriber should be not-null.");
+ requireNonNull(databaseListener, "Database subscriber should be not-null.");
dbListeners.add(databaseListener);
}
@@ -89,4 +92,17 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
public List<DatabaseLifecycleListener> getDatabaseListeners() {
return dbListeners;
}
+
+ /** */
+ public void registerDistributedConfigurationListener(
+ @NotNull DistributedConfigurationLifecycleListener lifecycleListener) {
+ requireNonNull(distributedConfigurationListeners, "Distributed configuration subscriber should be not-null.");
+
+ distributedConfigurationListeners.add(lifecycleListener);
+ }
+
+ /** */
+ public List<DistributedConfigurationLifecycleListener> getDistributedConfigurationListeners() {
+ return distributedConfigurationListeners;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java
new file mode 100644
index 0000000..2733759
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional
+ * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects.
+ *
+ * @param <E> Type of closure parameter.
+ * @param <R> Type of result value.
+ */
+public interface IgniteThrowableBiConsumer<E, R> extends Serializable {
+ /**
+ * Consumer body.
+ *
+ * @param e Consumer parameter.
+ * @throws IgniteCheckedException if body execution was failed.
+ */
+ public void accept(E e, R r) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
index 1c03b7c5..e9e9d41 100644
--- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -1,3 +1,4 @@
org.apache.ignite.spi.discovery.tcp.TestReconnectPluginProvider
org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteStandByClusterTest$StanByClusterTestProvider
org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider
+org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java
new file mode 100644
index 0000000..4e563e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ *
+ */
+@RunWith(JUnit4.class)
+public class DistributedConfigurationTest extends GridCommonAbstractTest {
+ /** */
+ private static final String TEST_PROP = "someLong";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+ storageCfg.getDefaultDataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(500L * 1024 * 1024);
+
+ cfg.setDataStorageConfiguration(storageCfg);
+
+ return cfg;
+ }
+
+// /**
+// * @throws Exception If failed.
+// */
+// @Test
+// public void test() throws Exception {
+// IgniteEx ignite0 = startGrid(0);
+// IgniteEx ignite1 = startGrid(1);
+//
+// ignite0.cluster().active(true);
+//
+// Assert.assertEquals(0, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+// Assert.assertEquals(0, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+//
+// ignite0.cluster().baselineConfiguration().setBaselineAutoAdjustTimeout(2);
+//
+// Assert.assertEquals(2, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+// Assert.assertEquals(2, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+//
+// stopAllGrids();
+//
+// ignite0 = startGrid(0);
+// ignite1 = startGrid(1);
+//
+// ignite0.cluster().active(true);
+//
+// Assert.assertEquals(2, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+// Assert.assertEquals(2, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout());
+// }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSuccessClusterWideUpdate() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ ignite0.cluster().active(true);
+
+ DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+ DistributedLongProperty long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(0, long0.value().longValue());
+ assertEquals(0, long1.value().longValue());
+
+ assertTrue(long0.propagate(2L));
+
+ //Value changed on whole grid.
+ assertEquals(2L, long0.value().longValue());
+ assertEquals(2L, long1.value().longValue());
+
+ stopAllGrids();
+
+ ignite0 = startGrid(0);
+ ignite1 = startGrid(1);
+
+ ignite0.cluster().active(true);
+
+ long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+ long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(2, long0.value().longValue());
+ assertEquals(2, long1.value().longValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReadLocalValueOnInactiveGrid() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ startGrid(1);
+
+ ignite0.cluster().active(true);
+
+ DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(0, long0.value().longValue());
+
+ assertTrue(long0.propagate(2L));
+
+ stopAllGrids();
+
+ ignite0 = startGrid(0);
+
+ long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(2, long0.value().longValue());
+
+ //Cluster wide update have not initialized yet.
+ assertFalse(long0.propagate(3L));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRegisterExistedProperty() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ ignite0.cluster().active(true);
+
+ DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(0, long0.value().longValue());
+
+ assertTrue(long0.propagate(2L));
+
+ DistributedLongProperty long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ //Already changed to 2.
+ assertEquals(2, long1.value().longValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test(expected = DetachedPropertyException.class)
+ public void testNotAttachedProperty() throws Exception {
+ DistributedLongProperty long0 = DistributedLongProperty.detachedProperty(TEST_PROP, 0L);
+ assertEquals(0, long0.value().longValue());
+
+ long0.propagate(1L);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReadInitValueBeforeOnReadyForReady() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ ignite0.cluster().active(true);
+
+ DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L);
+
+ assertEquals(0, long0.value().longValue());
+
+ long0.propagate(2L);
+
+ stopAllGrids();
+
+ TestDistibutedConfigurationPlugin.supplier = (ctx) -> {
+ DistributedLongProperty longProperty = null;
+ longProperty = ctx.distributedConfiguration().registerLong(TEST_PROP, -1L);
+
+ //Read init value because onReadyForReady have not happened yet.
+ assertEquals(-1, longProperty.value().longValue());
+
+ try {
+ assertFalse(longProperty.propagate(1L));
+ }
+ catch (IgniteCheckedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ ignite0 = startGrid(0);
+ ignite1 = startGrid(1);
+
+ long0 = ignite0.context().distributedConfiguration().getProperty(TEST_PROP);
+ DistributedLongProperty long1 = ignite1.context().distributedConfiguration().getProperty(TEST_PROP);
+
+ //After start it should read from local storage.
+ assertEquals(2, long0.value().longValue());
+ assertEquals(2, long1.value().longValue());
+ }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java
new file mode 100644
index 0000000..d86a3c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * TODO: Add class description.
+ *
+ * @author @java.author
+ * @version @java.version
+ */
+public class TestDistibutedConfigurationPlugin implements PluginProvider {
+ /** */
+ private GridKernalContext igniteCtx;
+
+ public static Consumer<GridKernalContext> supplier = (ctx) -> {
+ };
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "TestDistibutedConfigurationPlugin";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return "1.0";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String copyright() {
+ return "";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+ igniteCtx = ((IgniteKernal)ctx.grid()).context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+ supplier.accept(igniteCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() throws IgniteCheckedException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePlugin plugin() {
+ return new IgnitePlugin() {
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+ return null;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 7f07199..d0d3a5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterStartNodeResult;
import org.apache.ignite.internal.cluster.ClusterGroupEx;
+import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
@@ -191,6 +192,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
}
/** {@inheritDoc} */
+ @Override public DistributedBaselineConfiguration baselineConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isAsync() {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
index 7397863..f96a4c7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
@@ -32,7 +32,8 @@
"startNodes",
"startNodesAsync",
"stopNodes",
- "restartNodes"
+ "restartNodes",
+ "baselineConfiguration"
};
/** Members that are missing on .NET side and should be added in future. */
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index f0f3b7c..2665c25 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -262,6 +262,10 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(cfg.MvccVacuumFrequency, resCfg.MvccVacuumFrequency);
Assert.AreEqual(cfg.MvccVacuumThreadCount, resCfg.MvccVacuumThreadCount);
+ Assert.AreEqual(cfg.InitBaselineAutoAdjustEnabled, resCfg.InitBaselineAutoAdjustEnabled);
+ Assert.AreEqual(cfg.InitBaselineAutoAdjustTimeout, resCfg.InitBaselineAutoAdjustTimeout);
+ Assert.AreEqual(cfg.InitBaselineAutoAdjustMaxTimeout, resCfg.InitBaselineAutoAdjustMaxTimeout);
+
Assert.IsNotNull(resCfg.SqlSchemas);
Assert.AreEqual(2, resCfg.SqlSchemas.Count);
Assert.IsTrue(resCfg.SqlSchemas.Contains("SCHEMA_3"));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
index 63bf794..987fc21 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
@@ -216,6 +216,15 @@ namespace Apache.Ignite.Core
/** MVCC vacuum thread count. */
private int? _mvccVacuumThreadCnt;
+ /** */
+ private bool? _initBaselineAutoAdjustEnabled;
+
+ /** Initial value of time which we would wait before the actual topology change since last discovery event. */
+ private long? _initBaselineAutoAdjustTimeout;
+
+ /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */
+ private long? _initBaselineAutoAdjustMaxTimeout;
+
/// <summary>
/// Default network retry count.
/// </summary>
@@ -252,6 +261,21 @@ namespace Apache.Ignite.Core
public const int DefaultMvccVacuumThreadCount = 2;
/// <summary>
+ /// Default value for <see cref="InitBaselineAutoAdjustEnabled"/> property.
+ /// </summary>
+ public const bool DefaultInitBaselineAutoAdjustEnabled = false;
+
+ /// <summary>
+ /// Default value for <see cref="InitBaselineAutoAdjustTimeout"/> property.
+ /// </summary>
+ public const long DefaultInitBaselineAutoAdjustTimeout = 0;
+
+ /// <summary>
+ /// Default value for <see cref="InitBaselineAutoAdjustMaxTimeout"/> property.
+ /// </summary>
+ public const long DefaultInitBaselineAutoAdjustMaxTimeout = 0;
+
+ /// <summary>
/// Initializes a new instance of the <see cref="IgniteConfiguration"/> class.
/// </summary>
public IgniteConfiguration()
@@ -333,6 +357,9 @@ namespace Apache.Ignite.Core
writer.WriteLongNullable(_mvccVacuumFreq);
writer.WriteIntNullable(_mvccVacuumThreadCnt);
writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout);
+ writer.WriteBooleanNullable(_initBaselineAutoAdjustEnabled);
+ writer.WriteLongNullable(_initBaselineAutoAdjustTimeout);
+ writer.WriteLongNullable(_initBaselineAutoAdjustMaxTimeout);
if (SqlSchemas == null)
writer.WriteInt(-1);
@@ -722,6 +749,9 @@ namespace Apache.Ignite.Core
_mvccVacuumFreq = r.ReadLongNullable();
_mvccVacuumThreadCnt = r.ReadIntNullable();
_sysWorkerBlockedTimeout = r.ReadTimeSpanNullable();
+ _initBaselineAutoAdjustEnabled = r.ReadBooleanNullable();
+ _initBaselineAutoAdjustTimeout = r.ReadLongNullable();
+ _initBaselineAutoAdjustMaxTimeout = r.ReadLongNullable();
int sqlSchemasCnt = r.ReadInt();
@@ -1655,5 +1685,35 @@ namespace Apache.Ignite.Core
/// </summary>
[SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")]
public ICollection<string> SqlSchemas { get; set; }
+
+ /// <summary>
+ /// Initial value of manual baseline control or auto adjusting baseline.
+ /// </summary>
+ [DefaultValue(DefaultInitBaselineAutoAdjustEnabled)]
+ public bool InitBaselineAutoAdjustEnabled
+ {
+ get { return _initBaselineAutoAdjustEnabled ?? DefaultInitBaselineAutoAdjustEnabled; }
+ set { _initBaselineAutoAdjustEnabled = value; }
+ }
+
+ /// <summary>
+ /// Initial value of time which we would wait before the actual topology change since last discovery event.
+ /// </summary>
+ [DefaultValue(DefaultInitBaselineAutoAdjustTimeout)]
+ public long InitBaselineAutoAdjustTimeout
+ {
+ get { return _initBaselineAutoAdjustTimeout ?? DefaultInitBaselineAutoAdjustTimeout; }
+ set { _initBaselineAutoAdjustTimeout = value; }
+ }
+
+ /// <summary>
+ /// Initial value of time which we would wait from the first discovery event in the chain(node join/exit).
+ /// </summary>
+ [DefaultValue(DefaultInitBaselineAutoAdjustMaxTimeout)]
+ public long InitBaselineAutoAdjustMaxTimeout
+ {
+ get { return _initBaselineAutoAdjustMaxTimeout ?? DefaultInitBaselineAutoAdjustMaxTimeout; }
+ set { _initBaselineAutoAdjustMaxTimeout = value; }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 5f4a439f..efde394 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -2337,6 +2337,21 @@
<xs:documentation>Whether Java console output should be redirected to Console.Out and Console.Error.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="initBaselineAutoAdjustEnabled" type="xs:boolean">
+ <xs:annotation>
+ <xs:documentation>Initial value of manual baseline control or auto adjusting baseline.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="initBaselineAutoAdjustTimeout" type="xs:long">
+ <xs:annotation>
+ <xs:documentation>Initial value of time which we would wait before the actual topology change since last discovery event.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="initBaselineAutoAdjustMaxTimeout" type="xs:long">
+ <xs:annotation>
+ <xs:documentation>Initial value of time which we would wait from the first discovery event in the chain(node join/exit).</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>