You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/07/16 13:30:05 UTC
[ignite] branch master updated: IGNITE-14728 Changed
IGNITE_PDS_WAL_REBALANCE_THRESHOLD from System property to Distributed
property. Fixes #9248
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c767558 IGNITE-14728 Changed IGNITE_PDS_WAL_REBALANCE_THRESHOLD from System property to Distributed property. Fixes #9248
c767558 is described below
commit c7675588f2fb6f91fa27e0b4d91f711fb5f49c93
Author: Eduard Rakhmankulov <er...@gmail.com>
AuthorDate: Fri Jul 16 16:29:09 2021 +0300
IGNITE-14728 Changed IGNITE_PDS_WAL_REBALANCE_THRESHOLD from System property to Distributed property. Fixes #9248
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../util/GridCommandHandlerPropertiesTest.java | 47 ++++++++++++++++++
.../org/apache/ignite/IgniteSystemProperties.java | 2 +
.../GridCacheDatabaseSharedManager.java | 55 +++++++++++++++++++---
3 files changed, 98 insertions(+), 6 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
index e535b24..b114aba 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
@@ -32,6 +32,8 @@ import org.junit.Test;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
/**
@@ -197,4 +199,49 @@ public class GridCommandHandlerPropertiesTest extends GridCommandHandlerClusterB
)
);
}
+
+ /**
+ * Check the set command for property 'history.rebalance.threshold'.
+ */
+ @Test
+ public void testPropertyWalRebalanceThreshold() {
+ assertDistributedPropertyEquals(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+
+ int newVal = DFLT_PDS_WAL_REBALANCE_THRESHOLD * 2;
+
+ assertEquals(
+ EXIT_CODE_OK,
+ execute(
+ "--property", "set",
+ "--name", HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY,
+ "--val", Integer.toString(newVal)
+ )
+ );
+
+ assertDistributedPropertyEquals(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, newVal);
+ }
+
+ /**
+ * Validates that distributed property has specified value across all nodes.
+ *
+ * @param propName Distributed property name.
+ * @param expected Expected property value.
+ * @param <T> Property type.
+ */
+ private <T extends Serializable> void assertDistributedPropertyEquals(String propName, T expected) {
+ for (Ignite ign : G.allGrids()) {
+ IgniteEx ignEx = (IgniteEx) ign;
+
+ if (ign.configuration().isClientMode())
+ continue;
+
+ DistributedChangeableProperty<Serializable> prop =
+ ignEx.context().distributedConfiguration().property(propName);
+
+ assertEquals(
+ "Validation has failed on the cluster node [name=" + ign.configuration().getIgniteInstanceName(),
+ prop.get(),
+ expected);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e149dbe..7007013 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1113,7 +1113,9 @@ public final class IgniteSystemProperties {
/**
* WAL rebalance threshold.
+ * @deprecated use Distributed MetaStorage property {@code historical.rebalance.threshold}.
*/
+ @Deprecated
@SystemProperty(value = "WAL rebalance threshold", type = Integer.class,
defaults = "" + DFLT_PDS_WAL_REBALANCE_THRESHOLD)
public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4cd7cc8..da71924 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -137,6 +137,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -177,6 +179,8 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
@@ -239,15 +243,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
- /** @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD */
+ /**
+ * @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD
+ * @see #HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY
+ */
public static final int DFLT_PDS_WAL_REBALANCE_THRESHOLD = 500;
/** @see IgniteSystemProperties#IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE */
public static final int DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE = 60;
- /** */
- private final int walRebalanceThreshold =
- getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+ /**
+ * Threshold value to use history or full rebalance for local partition.
+ * Master value contained in {@link #historicalRebalanceThreshold}.
+ */
+ private final int walRebalanceThresholdLegacy =
+ getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+
+ /** WAL rebalance threshold distributed configuration key */
+ public static final String HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY = "historical.rebalance.threshold";
/** Prefer historical rebalance flag. */
private final boolean preferWalRebalance = getBoolean(IGNITE_PREFER_WAL_REBALANCE);
@@ -357,6 +370,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Checkpoint frequency deviation. */
private SimpleDistributedProperty<Integer> cpFreqDeviation;
+ /** WAL rebalance threshold. */
+ private final SimpleDistributedProperty<Integer> historicalRebalanceThreshold =
+ new SimpleDistributedProperty<>(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, Integer::parseInt);
+
/**
* @param ctx Kernal context.
*/
@@ -524,6 +541,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
assert !kernalCtx.clientNode();
+ initWalRebalanceThreshold();
+
if (!kernalCtx.clientNode()) {
kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
@@ -1671,7 +1690,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
reservedForExchange = null;
grpPartsWithCnts.clear();
-
+
return grpPartsWithCnts;
}
@@ -1758,7 +1777,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
continue;
for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
- if (locPart.state() == OWNING && (preferWalRebalance() || locPart.fullSize() > walRebalanceThreshold))
+ if (locPart.state() == OWNING && (preferWalRebalance() ||
+ locPart.fullSize() > historicalRebalanceThreshold.getOrDefault(walRebalanceThresholdLegacy)))
res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
}
}
@@ -3655,4 +3675,27 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
return Collections.unmodifiableMap(partitionRecoveryStates);
}
}
+
+ /**
+ * Registers {@link #historicalRebalanceThreshold} property in distributed metastore.
+ */
+ private void initWalRebalanceThreshold() {
+ cctx.kernalContext().internalSubscriptionProcessor().registerDistributedConfigurationListener(
+ new DistributedConfigurationLifecycleListener() {
+ /** {@inheritDoc} */
+ @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+ String logMsgFmt = "Historical rebalance WAL threshold changed [property=%s, oldVal=%s, newVal=%s]";
+
+ historicalRebalanceThreshold.addListener(makeUpdateListener(logMsgFmt, log));
+
+ dispatcher.registerProperties(historicalRebalanceThreshold);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyToWrite() {
+ setDefaultValue(historicalRebalanceThreshold, walRebalanceThresholdLegacy, log);
+ }
+ }
+ );
+ }
}