You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/10/25 15:08:08 UTC
[ignite] branch master updated: IGNITE-15308 Settings for
transactions monitoring are stored in metastorage to support propagation to
new nodes and persisting them to disk - Fixes #9329.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 53905b4 IGNITE-15308 Settings for transactions monitoring are stored in metastorage to support propagation to new nodes and persisting them to disk - Fixes #9329.
53905b4 is described below
commit 53905b4ad8f24f60283de75b8be480d3712f4be1
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Mon Oct 25 18:00:15 2021 +0300
IGNITE-15308 Settings for transactions monitoring are stored in metastorage to support propagation to new nodes and persisting them to disk - Fixes #9329.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../DistributedTransactionConfiguration.java | 357 +++++++++++++++++++++
.../cache/LongOperationsDumpSettingsClosure.java | 53 ---
.../LongRunningTxTimeDumpSettingsClosure.java | 73 -----
.../TxOwnerDumpRequestAllowedSettingClosure.java | 56 ----
.../cache/persistence/wal/FileDescriptor.java | 5 +
.../persistence/wal/FileWriteAheadLogManager.java | 3 +-
.../cache/transactions/IgniteTxManager.java | 183 +++++------
.../distributed/DistributedDoubleProperty.java | 36 +++
.../distributed/DistributedIntegerProperty.java | 36 +++
.../apache/ignite/internal/util/IgniteUtils.java | 18 +-
.../main/resources/META-INF/classnames.properties | 1 -
.../internal/TransactionsMXBeanImplTest.java | 300 +++++++++++++----
.../GridTransactionsSystemUserTimeMetricsTest.java | 85 ++++-
.../db/wal/IgniteWalHistoryReservationsTest.java | 4 +-
14 files changed, 843 insertions(+), 367 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedTransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedTransactionConfiguration.java
new file mode 100644
index 0000000..bc0a02b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedTransactionConfiguration.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.IgniteSystemProperties.getFloat;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedDoubleProperty.detachedDoubleProperty;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedIntegerProperty.detachedIntegerProperty;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
+
+/**
+ * Distributed transaction configuration.
+ */
+public class DistributedTransactionConfiguration {
+ /** Property update message. */
+ private static final String PROPERTY_UPDATE_MESSAGE =
+ "Transactions parameter '%s' was changed from '%s' to '%s'";
+
+ /** Default value of {@link #longOperationsDumpTimeout}. */
+ private final long dfltLongOpsDumpTimeout =
+ getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, DFLT_LONG_OPERATIONS_DUMP_TIMEOUT);
+
+ /** Default value of {@link #longTransactionTimeDumpThreshold}. */
+ private final long dfltLongTransactionTimeDumpThreshold =
+ getLong(IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD, 0);
+
+ /** Default value of {@link #transactionTimeDumpSamplesCoefficient}. */
+ private final double dfltTransactionTimeDumpSamplesCoefficient =
+ getFloat(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, 0.0f);
+
+ /** Default value of {@link #longTransactionTimeDumpSamplesPerSecondLimit}. */
+ private final int dfltLongTransactionTimeDumpSamplesPerSecondLimit =
+ getInteger(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT, 5);
+
+ /** Default value of {@link #collisionsDumpInterval}. */
+ private final int dfltCollisionsDumpInterval =
+ getInteger(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, 1000);
+
+ /** Default value of {@link #txOwnerDumpRequestsAllowed}. */
+ private final boolean dfltTxOwnerDumpRequestsAllowed =
+ getBoolean(IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED, true);
+
+ /**
+ * Shows if dump requests from local node to near node are allowed, when long running transaction
+ * is found. If allowed, the compute request to near node will be made to get thread dump of transaction
+ * owner thread.
+ */
+ private final DistributedChangeableProperty<Boolean> txOwnerDumpRequestsAllowed =
+ detachedBooleanProperty("txOwnerDumpRequestsAllowed");
+
+ /** Long operations dump timeout. */
+ private final DistributedChangeableProperty<Long> longOperationsDumpTimeout =
+ detachedLongProperty("longOperationsDumpTimeout");
+
+ /**
+ * Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with
+ * information about how much time did it spent in system time (time while aquiring locks, preparing,
+ * commiting, etc) and user time (time when client node runs some code while holding transaction and not
+ * waiting it). Equals 0 if not set. No transactions are dumped in log if this parameter is not set.
+ */
+ private final DistributedChangeableProperty<Long> longTransactionTimeDumpThreshold =
+ detachedLongProperty("longTransactionTimeDumpThreshold");
+
+ /** The coefficient for samples of completed transactions that will be dumped in log. */
+ private final DistributedChangeableProperty<Double> transactionTimeDumpSamplesCoefficient =
+ detachedDoubleProperty("transactionTimeDumpSamplesCoefficient");
+
+ /**
+ * The limit of samples of completed transactions that will be dumped in log per second, if
+ * {@link #transactionTimeDumpSamplesCoefficient} is above <code>0.0</code>. Must be integer value
+ * greater than <code>0</code>.
+ */
+ private final DistributedChangeableProperty<Integer> longTransactionTimeDumpSamplesPerSecondLimit =
+ detachedIntegerProperty("longTransactionTimeDumpSamplesPerSecondLimit");
+
+ /** Collisions dump interval. */
+ private final DistributedChangeableProperty<Integer> collisionsDumpInterval =
+ detachedIntegerProperty("collisionsDumpInterval");
+
+ /**
+ * @param ctx Kernal context.
+ * @param log Log.
+ * @param longOperationsDumpTimeoutLsnr Listener of {@link #longOperationsDumpTimeout} change event.
+ * @param collisionsDumpIntervalLsnr Listener of {@link #collisionsDumpInterval} change event.
+ */
+ public DistributedTransactionConfiguration(
+ GridKernalContext ctx,
+ IgniteLogger log,
+ DistributePropertyListener<Long> longOperationsDumpTimeoutLsnr,
+ DistributePropertyListener<Integer> collisionsDumpIntervalLsnr
+ ) {
+ ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+ new DistributedConfigurationLifecycleListener() {
+ @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+ txOwnerDumpRequestsAllowed.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ longOperationsDumpTimeout.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ longTransactionTimeDumpThreshold.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ transactionTimeDumpSamplesCoefficient.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ longTransactionTimeDumpSamplesPerSecondLimit.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ collisionsDumpInterval.addListener(makeUpdateListener(PROPERTY_UPDATE_MESSAGE, log));
+ longOperationsDumpTimeout.addListener(longOperationsDumpTimeoutLsnr);
+ collisionsDumpInterval.addListener(collisionsDumpIntervalLsnr);
+
+ dispatcher.registerProperties(txOwnerDumpRequestsAllowed, longOperationsDumpTimeout,
+ longTransactionTimeDumpThreshold, transactionTimeDumpSamplesCoefficient,
+ longTransactionTimeDumpSamplesPerSecondLimit, collisionsDumpInterval);
+ }
+
+ @Override public void onReadyToWrite() {
+ if (ReadableDistributedMetaStorage.isSupported(ctx)) {
+ setDefaultValue(
+ longOperationsDumpTimeout,
+ dfltLongOpsDumpTimeout,
+ log);
+ setDefaultValue(
+ longTransactionTimeDumpThreshold,
+ dfltLongTransactionTimeDumpThreshold,
+ log);
+ setDefaultValue(
+ transactionTimeDumpSamplesCoefficient,
+ dfltTransactionTimeDumpSamplesCoefficient,
+ log);
+ setDefaultValue(
+ longTransactionTimeDumpSamplesPerSecondLimit,
+ dfltLongTransactionTimeDumpSamplesPerSecondLimit,
+ log);
+ setDefaultValue(
+ collisionsDumpInterval,
+ dfltCollisionsDumpInterval,
+ log);
+ setDefaultValue(
+ txOwnerDumpRequestsAllowed,
+ dfltTxOwnerDumpRequestsAllowed,
+ log);
+ } else {
+ log.warning("Distributed metastorage is not supported. " +
+ "All distributed transaction configuration parameters are unavailable. " +
+ "Default values will be set.");
+
+ longOperationsDumpTimeout.localUpdate(dfltLongOpsDumpTimeout);
+ longTransactionTimeDumpThreshold.localUpdate(dfltLongTransactionTimeDumpThreshold);
+ transactionTimeDumpSamplesCoefficient.localUpdate(dfltTransactionTimeDumpSamplesCoefficient);
+ longTransactionTimeDumpSamplesPerSecondLimit.localUpdate(dfltLongTransactionTimeDumpSamplesPerSecondLimit);
+ collisionsDumpInterval.localUpdate(dfltCollisionsDumpInterval);
+ txOwnerDumpRequestsAllowed.localUpdate(dfltTxOwnerDumpRequestsAllowed);
+ }
+ }
+ }
+ );
+ }
+
+ /**
+ * Cluster wide update of {@link #longOperationsDumpTimeout}.
+ *
+ * @param timeout New value of {@link #longOperationsDumpTimeout}.
+ * @return Future for {@link #longOperationsDumpTimeout} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateLongOperationsDumpTimeoutAsync(long timeout) throws IgniteCheckedException {
+ return longOperationsDumpTimeout.propagateAsync(timeout);
+ }
+
+ /**
+ * Local update of {@link #longOperationsDumpTimeout}.
+ *
+ * @param timeout New value of {@link #longOperationsDumpTimeout}.
+ */
+ public void updateLongOperationsDumpTimeoutLocal(long timeout) {
+ longOperationsDumpTimeout.localUpdate(timeout);
+ }
+
+ /**
+ * @return Long operations dump timeout. See {@link #longOperationsDumpTimeout}.
+ */
+ public Long longOperationsDumpTimeout() {
+ return longOperationsDumpTimeout.getOrDefault(dfltLongOpsDumpTimeout);
+ }
+
+ /**
+ * Cluster wide update of {@link #longTransactionTimeDumpThreshold}.
+ *
+ * @param longTransactionTimeDumpThreshold New value of threshold timeout in milliseconds.
+ * @return Future for {@link #longTransactionTimeDumpThreshold} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateLongTransactionTimeDumpThresholdAsync(long longTransactionTimeDumpThreshold)
+ throws IgniteCheckedException {
+ return this.longTransactionTimeDumpThreshold.propagateAsync(longTransactionTimeDumpThreshold);
+ }
+
+ /**
+ * Local update of {@link #longTransactionTimeDumpThreshold}.
+ *
+ * @param longTransactionTimeDumpThreshold New value of {@link #longTransactionTimeDumpThreshold}.
+ */
+ public void updateLongTransactionTimeDumpThresholdLocal(long longTransactionTimeDumpThreshold) {
+ this.longTransactionTimeDumpThreshold.localUpdate(longTransactionTimeDumpThreshold);
+ }
+
+ /**
+ * @return Threshold timeout for long transactions. See {@link #longTransactionTimeDumpThreshold}
+ * for more information.
+ */
+ public Long longTransactionTimeDumpThreshold() {
+ return longTransactionTimeDumpThreshold.getOrDefault(dfltLongTransactionTimeDumpThreshold);
+ }
+
+ /**
+ * Cluster wide update of {@link #transactionTimeDumpSamplesCoefficient}.
+ *
+ * @param transactionTimeDumpSamplesCoefficient New coefficient for samples of completed transactions that will be dumped in log.
+ * @return Future for {@link #transactionTimeDumpSamplesCoefficient} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateTransactionTimeDumpSamplesCoefficientAsync(
+ double transactionTimeDumpSamplesCoefficient
+ ) throws IgniteCheckedException {
+ return this.transactionTimeDumpSamplesCoefficient.propagateAsync(transactionTimeDumpSamplesCoefficient);
+ }
+
+ /**
+ * Local update of {@link #transactionTimeDumpSamplesCoefficient}.
+ *
+ * @param transactionTimeDumpSamplesCoefficient New coefficient for samples of completed transactions that will be dumped in log.
+ */
+ public void updateTransactionTimeDumpSamplesCoefficientLocal(double transactionTimeDumpSamplesCoefficient) {
+ this.transactionTimeDumpSamplesCoefficient.localUpdate(transactionTimeDumpSamplesCoefficient);
+ }
+
+ /**
+ * @return The coefficient for samples of completed transactions that will be dumped in log.
+ * See {@link #transactionTimeDumpSamplesCoefficient}.
+ */
+ public Double transactionTimeDumpSamplesCoefficient() {
+ return transactionTimeDumpSamplesCoefficient.getOrDefault(dfltTransactionTimeDumpSamplesCoefficient);
+ }
+
+ /**
+ * Cluster wide update of {@link #longTransactionTimeDumpSamplesPerSecondLimit}.
+ *
+ * @param limit New limit of samples of completed transactions that will be dumped in log per second.
+ * @return Future for {@link #longTransactionTimeDumpSamplesPerSecondLimit} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateLongTransactionTimeDumpSamplesPerSecondLimitAsync(int limit) throws IgniteCheckedException {
+ return this.longTransactionTimeDumpSamplesPerSecondLimit.propagateAsync(limit);
+ }
+
+ /**
+ * Local update of {@link #longTransactionTimeDumpSamplesPerSecondLimit}.
+ *
+ * @param limit New limit of samples of completed transactions that will be dumped in log per second.
+ */
+ public void updateLongTransactionTimeDumpSamplesPerSecondLimitLocal(int limit) {
+ this.longTransactionTimeDumpSamplesPerSecondLimit.localUpdate(limit);
+ }
+
+ /**
+ * @return Limit of samples of completed transactions that will be dumped in log per second.
+ * See {@link #longTransactionTimeDumpSamplesPerSecondLimit} for more information.
+ */
+ public Integer longTransactionTimeDumpSamplesPerSecondLimit() {
+ return longTransactionTimeDumpSamplesPerSecondLimit.getOrDefault(dfltLongTransactionTimeDumpSamplesPerSecondLimit);
+ }
+
+ /**
+ * Cluster wide update of {@link #collisionsDumpInterval}.
+ *
+ * @param collisionsDumpInterval New collisions dump interval.
+ * @return Future for {@link #collisionsDumpInterval} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateCollisionsDumpIntervalAsync(int collisionsDumpInterval) throws IgniteCheckedException {
+ return this.collisionsDumpInterval.propagateAsync(collisionsDumpInterval);
+ }
+
+ /**
+ * Local update of {@link #collisionsDumpInterval}.
+ *
+ * @param collisionsDumpInterval New collisions dump interval.
+ */
+ public void updateCollisionsDumpIntervalLocal(int collisionsDumpInterval) {
+ this.collisionsDumpInterval.localUpdate(collisionsDumpInterval);
+ }
+
+ /**
+ * @return Collisions dump interval. See {@link #collisionsDumpInterval}.
+ */
+ public Integer collisionsDumpInterval() {
+ return collisionsDumpInterval.getOrDefault(dfltCollisionsDumpInterval);
+ }
+
+ /**
+ * Cluster wide update of {@link #txOwnerDumpRequestsAllowed}.
+ *
+ * @param allowed Allowance to dump requests from local node to near node, when long running transaction is found.
+ * @return Future for {@link #txOwnerDumpRequestsAllowed} update operation.
+ * @throws IgniteCheckedException If failed during cluster wide update.
+ */
+ public GridFutureAdapter<?> updateTxOwnerDumpRequestsAllowedAsync(boolean allowed) throws IgniteCheckedException {
+ return this.txOwnerDumpRequestsAllowed.propagateAsync(allowed);
+ }
+
+ /**
+ * Local update of {@link #txOwnerDumpRequestsAllowed}.
+ *
+ * @param allowed Allowance to dump requests from local node to near node, when long running transaction is found.
+ */
+ public void updateTxOwnerDumpRequestsAllowedLocal(boolean allowed) {
+ this.txOwnerDumpRequestsAllowed.localUpdate(allowed);
+ }
+
+ /**
+ * @return Allowance to dump requests from local node to near node, when long running transaction is found.
+ * See {@link #txOwnerDumpRequestsAllowed} for more information.
+ */
+ public Boolean txOwnerDumpRequestsAllowed() {
+ return txOwnerDumpRequestsAllowed.getOrDefault(dfltTxOwnerDumpRequestsAllowed);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
deleted file mode 100644
index ed1c418..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-/**
- * Closure that is sent to the node in order to change
- * "Long operations dump timeout" parameter and also reschedule the task for
- * dumping long operations.
- */
-public class LongOperationsDumpSettingsClosure implements IgniteRunnable {
- /** Serialization ID. */
- private static final long serialVersionUID = 0L;
-
- /** Long operations dump timeout. */
- private final long longOpsDumpTimeout;
-
- /** Auto-inject Ignite instance. */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /**
- * Constructor.
- *
- * @param longOpsDumpTimeout Long operations dump timeout.
- */
- public LongOperationsDumpSettingsClosure(long longOpsDumpTimeout) {
- this.longOpsDumpTimeout = longOpsDumpTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- ignite.context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongRunningTxTimeDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongRunningTxTimeDumpSettingsClosure.java
deleted file mode 100644
index 5080379..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongRunningTxTimeDumpSettingsClosure.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-/**
- * Closure that is sent on all server nodes in order to change configuration parameters
- * of dumping long running transactions' system and user time values.
- */
-public class LongRunningTxTimeDumpSettingsClosure implements IgniteRunnable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final Long timeoutThreshold;
-
- /** */
- private final Double samplesCoefficient;
-
- /** */
- private final Integer samplesPerSecondLimit;
-
- /**
- * Auto-inject Ignite instance
- */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** */
- public LongRunningTxTimeDumpSettingsClosure(
- Long timeoutThreshold,
- Double samplesCoefficient,
- Integer samplesPerSecondLimit
- ) {
- this.timeoutThreshold = timeoutThreshold;
- this.samplesCoefficient = samplesCoefficient;
- this.samplesPerSecondLimit = samplesPerSecondLimit;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- IgniteTxManager tm = ((IgniteEx) ignite).context().cache().context().tm();
-
- if (timeoutThreshold != null)
- tm.longTransactionTimeDumpThreshold(timeoutThreshold);
-
- if (samplesCoefficient != null)
- tm.transactionTimeDumpSamplesCoefficient(samplesCoefficient);
-
- if (samplesPerSecondLimit != null)
- tm.transactionTimeDumpSamplesPerSecondLimit(samplesPerSecondLimit);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxOwnerDumpRequestAllowedSettingClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxOwnerDumpRequestAllowedSettingClosure.java
deleted file mode 100644
index 882aa89..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxOwnerDumpRequestAllowedSettingClosure.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-/**
- * Closure that is sent on all server nodes in order to change transaction configuration parameter
- * that allows or disallows dump requests from local to near nodes while detection long running
- * transactions.
- */
-public class TxOwnerDumpRequestAllowedSettingClosure implements IgniteRunnable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final boolean allowed;
-
- /**
- * Auto-inject Ignite instance
- */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** */
- public TxOwnerDumpRequestAllowedSettingClosure(boolean allowed) {
- this.allowed = allowed;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- ((IgniteEx)ignite)
- .context()
- .cache()
- .context()
- .tm()
- .setTxOwnerDumpRequestsAllowed(allowed);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index f654c32..eeaab94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -151,4 +151,9 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
return new SegmentIO(idx, fileIO);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass().getSimpleName() + " [file=" + (file == null ? null : file.getAbsolutePath()) + ", idx=" + idx + ']';
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index bbab285..f5ec83b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2863,9 +2863,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (start != null) {
if (!F.isEmpty(descs)) {
- if (descs[0].idx() > start.index())
+ if (descs[0].idx() > start.index()) {
throw new IgniteCheckedException("WAL history is too short " +
"[descs=" + Arrays.asList(descs) + ", start=" + start + ']');
+ }
for (AbstractFileDescriptor desc : descs) {
if (desc.idx() == start.index()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 98c1ed4..a95a223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,6 +50,7 @@ import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.DistributedTransactionConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -71,9 +73,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
-import org.apache.ignite.internal.processors.cache.LongOperationsDumpSettingsClosure;
-import org.apache.ignite.internal.processors.cache.LongRunningTxTimeDumpSettingsClosure;
-import org.apache.ignite.internal.processors.cache.TxOwnerDumpRequestAllowedSettingClosure;
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -130,25 +129,15 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED;
-import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
-import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT;
-import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_TX_COLLISIONS_DUMP;
-import static org.apache.ignite.internal.IgniteFeatures.LRT_SYSTEM_USER_TIME_DUMP_SETTINGS;
-import static org.apache.ignite.internal.IgniteFeatures.TRANSACTION_OWNER_THREAD_DUMP_PROVIDING;
-import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
@@ -156,7 +145,6 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteInt
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
-import static org.apache.ignite.internal.util.IgniteUtils.broadcastToNodesSupportingFeature;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -239,10 +227,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
static int DEADLOCK_MAX_ITERS =
IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, DFLT_TX_DEADLOCK_DETECTION_MAX_ITERS);
- /** Collisions dump interval. */
- private volatile int collisionsDumpInterval =
- IgniteSystemProperties.getInteger(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, DFLT_DUMP_TX_COLLISIONS_INTERVAL);
-
/** Lower tx collisions queue size threshold. */
private static final int COLLISIONS_QUEUE_THRESHOLD = 100;
@@ -270,37 +254,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** TX handler. */
private IgniteTxHandler txHnd;
- /**
- * Shows if dump requests from local node to near node are allowed, when long running transaction
- * is found. If allowed, the compute request to near node will be made to get thread dump of transaction
- * owner thread.
- */
- private boolean txOwnerDumpRequestsAllowed =
- IgniteSystemProperties.getBoolean(IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED, DFLT_TX_OWNER_DUMP_REQUESTS_ALLOWED);
-
- /**
- * Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with
- * information about how much time did it spent in system time (time while aquiring locks, preparing,
- * commiting, etc) and user time (time when client node runs some code while holding transaction and not
- * waiting it). Equals 0 if not set. No transactions are dumped in log if this parameter is not set.
- */
- private volatile long longTransactionTimeDumpThreshold = getLong(IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD, 0);
-
- /**
- * The coefficient for samples of completed transactions that will be dumped in log.
- */
- private volatile double transactionTimeDumpSamplesCoefficient =
- IgniteSystemProperties.getFloat(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, 0.0f);
-
- /**
- * The limit of samples of completed transactions that will be dumped in log per second, if
- * {@link #transactionTimeDumpSamplesCoefficient} is above <code>0.0</code>. Must be integer value
- * greater than <code>0</code>.
- */
- private volatile int longTransactionTimeDumpSamplesPerSecondLimit =
- IgniteSystemProperties.getInteger(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT,
- DFLT_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT);
-
/** Committed local transactions. */
private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
new GridBoundedConcurrentOrderedMap<>(
@@ -321,10 +274,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Slow tx warn timeout. */
private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT;
- /** Long operations dump timeout. */
- private volatile long longOpsDumpTimeout =
- getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, DFLT_LONG_OPERATIONS_DUMP_TIMEOUT);
-
/** */
private TxDumpsThrottling txDumpsThrottling = new TxDumpsThrottling();
@@ -358,6 +307,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Key collisions info holder. */
private volatile KeyCollisionsHolder keyCollisionsInfo;
+ /** Distributed transaction configuration. */
+ private DistributedTransactionConfiguration distributedTransactionConfiguration;
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_TX);
@@ -434,16 +386,31 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.txMetrics().onTxManagerStarted();
+ keyCollisionsInfo = new KeyCollisionsHolder();
+
+ distributedTransactionConfiguration = new DistributedTransactionConfiguration(cctx.kernalContext(), log,
+ (String name, Long oldVal, Long newVal) -> {
+ if (!Objects.equals(oldVal, newVal)) {
+ scheduleDumpTask(
+ IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
+ () -> cctx.kernalContext().closure().runLocalSafe(
+ () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(newVal)),
+ newVal);
+ }
+ },
+ (String name, Integer oldVal, Integer newVal) -> {
+ if (!Objects.equals(oldVal, newVal)) {
+ scheduleDumpTask(
+ IGNITE_DUMP_TX_COLLISIONS_INTERVAL,
+ this::collectTxCollisionsInfo,
+ newVal);
+ }
+ });
+
cctx.kernalContext().systemView().registerView(TXS_MON_LIST, TXS_MON_LIST_DESC,
new TransactionViewWalker(),
new ReadOnlyCollectionView2X<>(idMap.values(), nearIdMap.values()),
TransactionView::new);
-
- keyCollisionsInfo = new KeyCollisionsHolder();
-
- longOperationsDumpTimeout(longOperationsDumpTimeout());
-
- txCollisionsDumpInterval(collisionsDumpInterval());
}
/** {@inheritDoc} */
@@ -600,7 +567,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return <code>true</code> if allowed, <code>false</code> otherwise.
*/
public boolean txOwnerDumpRequestsAllowed() {
- return txOwnerDumpRequestsAllowed;
+ return distributedTransactionConfiguration.txOwnerDumpRequestsAllowed();
}
/**
@@ -611,7 +578,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param allowed whether allowed
*/
public void setTxOwnerDumpRequestsAllowed(boolean allowed) {
- txOwnerDumpRequestsAllowed = allowed;
+ distributedTransactionConfiguration.updateTxOwnerDumpRequestsAllowedLocal(allowed);
}
/**
@@ -623,7 +590,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Threshold timeout in milliseconds.
*/
public long longTransactionTimeDumpThreshold() {
- return longTransactionTimeDumpThreshold;
+ return distributedTransactionConfiguration.longTransactionTimeDumpThreshold();
}
/**
@@ -638,14 +605,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert longTransactionTimeDumpThreshold >= 0
: "longTransactionTimeDumpThreshold must be greater than or equal to 0.";
- this.longTransactionTimeDumpThreshold = longTransactionTimeDumpThreshold;
+ distributedTransactionConfiguration.updateLongTransactionTimeDumpThresholdLocal(longTransactionTimeDumpThreshold);
}
/**
* The coefficient for samples of completed transactions that will be dumped in log.
*/
public double transactionTimeDumpSamplesCoefficient() {
- return transactionTimeDumpSamplesCoefficient;
+ return distributedTransactionConfiguration.transactionTimeDumpSamplesCoefficient();
}
/**
@@ -655,7 +622,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert transactionTimeDumpSamplesCoefficient >= 0.0 && transactionTimeDumpSamplesCoefficient <= 1.0
: "transactionTimeDumpSamplesCoefficient value must be between 0.0 and 1.0 inclusively.";
- this.transactionTimeDumpSamplesCoefficient = transactionTimeDumpSamplesCoefficient;
+ this.distributedTransactionConfiguration.updateTransactionTimeDumpSamplesCoefficientLocal(transactionTimeDumpSamplesCoefficient);
}
/**
@@ -664,7 +631,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* greater than <code>0</code>.
*/
public int transactionTimeDumpSamplesPerSecondLimit() {
- return longTransactionTimeDumpSamplesPerSecondLimit;
+ return distributedTransactionConfiguration.longTransactionTimeDumpSamplesPerSecondLimit();
}
/**
@@ -676,7 +643,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert transactionTimeDumpSamplesPerSecondLimit > 0
: "transactionTimeDumpSamplesPerSecondLimit must be integer value greater than 0.";
- this.longTransactionTimeDumpSamplesPerSecondLimit = transactionTimeDumpSamplesPerSecondLimit;
+ distributedTransactionConfiguration.updateLongTransactionTimeDumpSamplesPerSecondLimitLocal(
+ transactionTimeDumpSamplesPerSecondLimit);
}
/**
@@ -2186,14 +2154,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Long operations dump timeout.
*/
public long longOperationsDumpTimeout() {
- return longOpsDumpTimeout;
+ return distributedTransactionConfiguration.longOperationsDumpTimeout();
}
/**
* @param longOpsDumpTimeout Long operations dump timeout.
*/
public void longOperationsDumpTimeout(long longOpsDumpTimeout) {
- this.longOpsDumpTimeout = longOpsDumpTimeout;
+ distributedTransactionConfiguration.updateLongOperationsDumpTimeoutLocal(longOpsDumpTimeout);
scheduleDumpTask(
IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
@@ -2208,7 +2176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param timeout Sets tx key collisions analysis interval.
**/
void txCollisionsDumpInterval(int timeout) {
- collisionsDumpInterval = timeout;
+ distributedTransactionConfiguration.updateCollisionsDumpIntervalLocal(timeout);
scheduleDumpTask(
IGNITE_DUMP_TX_COLLISIONS_INTERVAL,
@@ -3004,12 +2972,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param allowed whether allowed
*/
public void setTxOwnerDumpRequestsAllowedDistributed(boolean allowed) {
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new TxOwnerDumpRequestAllowedSettingClosure(allowed),
- true,
- TRANSACTION_OWNER_THREAD_DUMP_PROVIDING
- );
+ try {
+ distributedTransactionConfiguration.updateTxOwnerDumpRequestsAllowedAsync(allowed).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
@@ -3024,12 +2991,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
public void longTransactionTimeDumpThresholdDistributed(long threshold) {
assert threshold >= 0 : "Threshold timeout must be greater than or equal to 0.";
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new LongRunningTxTimeDumpSettingsClosure(threshold, null, null),
- false,
- LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
- );
+ try {
+ distributedTransactionConfiguration.updateLongTransactionTimeDumpThresholdAsync(threshold).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
@@ -3041,12 +3007,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
public void transactionTimeDumpSamplesCoefficientDistributed(double coefficient) {
assert coefficient >= 0.0 && coefficient <= 1.0 : "Percentage value must be between 0.0 and 1.0 inclusively.";
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new LongRunningTxTimeDumpSettingsClosure(null, coefficient, null),
- false,
- LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
- );
+ try {
+ distributedTransactionConfiguration.updateTransactionTimeDumpSamplesCoefficientAsync(coefficient).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
@@ -3059,12 +3024,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
public void longTransactionTimeDumpSamplesPerSecondLimit(int limit) {
assert limit > 0 : "Limit value must be greater than 0.";
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new LongRunningTxTimeDumpSettingsClosure(null, null, limit),
- false,
- LRT_SYSTEM_USER_TIME_DUMP_SETTINGS
- );
+ try {
+ distributedTransactionConfiguration.updateLongTransactionTimeDumpSamplesPerSecondLimitAsync(limit).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** {@inheritDoc} */
@@ -3087,12 +3051,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param longOpsDumpTimeout Long operations dump timeout.
*/
public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) {
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new LongOperationsDumpSettingsClosure(longOpsDumpTimeout),
- false,
- DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT
- );
+ try {
+ distributedTransactionConfiguration.updateLongOperationsDumpTimeoutAsync(longOpsDumpTimeout).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
@@ -3102,7 +3065,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Collisions dump interval.
*/
public int collisionsDumpInterval() {
- return collisionsDumpInterval;
+ return distributedTransactionConfiguration.collisionsDumpInterval();
}
/**
@@ -3112,12 +3075,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param collisionsDumpInterval New collisions dump interval or negative for disabling.
*/
public void collisionsDumpIntervalDistributed(int collisionsDumpInterval) {
- broadcastToNodesSupportingFeature(
- cctx.kernalContext(),
- new TxCollisionsDumpSettingsClosure(collisionsDumpInterval),
- true,
- DISTRIBUTED_TX_COLLISIONS_DUMP
- );
+ try {
+ distributedTransactionConfiguration.updateCollisionsDumpIntervalAsync(collisionsDumpInterval).get();
+ } catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/**
@@ -3154,6 +3116,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
pushCollidingKeysWithQueueSize(entry, qSize);
}
+ /**
+ * @return Distributed configuration.
+ */
+ public DistributedTransactionConfiguration getDistributedTransactionConfiguration() {
+ return distributedTransactionConfiguration;
+ }
+
/** Tx key collisions info holder. */
private final class KeyCollisionsHolder {
/** Stripes count. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedDoubleProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedDoubleProperty.java
new file mode 100644
index 0000000..b6278f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedDoubleProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+/**
+ * Implementation of {@link DistributedProperty} for {@link Double}.
+ */
+public class DistributedDoubleProperty extends DistributedComparableProperty<Double> {
+ /** {@inheritDoc} */
+ DistributedDoubleProperty(String name) {
+ super(name, Double::parseDouble);
+ }
+
+ /**
+ * @param name Name of property.
+ * @return Property detached from processor.(Distributed updating are not accessable).
+ */
+ public static DistributedDoubleProperty detachedDoubleProperty(String name) {
+ return new DistributedDoubleProperty(name);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedIntegerProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedIntegerProperty.java
new file mode 100644
index 0000000..5e7b503
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedIntegerProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+/**
+ * Implementation of {@link DistributedProperty} for {@link Integer}.
+ */
+public class DistributedIntegerProperty extends DistributedComparableProperty<Integer> {
+ /** {@inheritDoc} */
+ DistributedIntegerProperty(String name) {
+ super(name, Integer::parseInt);
+ }
+
+ /**
+ * @param name Name of property.
+ * @return Property detached from processor.(Distributed updating are not accessable).
+ */
+ public static DistributedIntegerProperty detachedIntegerProperty(String name) {
+ return new DistributedIntegerProperty(name);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e06f11e..ca270d6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -199,7 +199,6 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
-import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -231,6 +230,7 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxSerializationCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.io.GridFilenameUtils;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
@@ -251,6 +251,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteOutClosure;
@@ -12013,29 +12014,32 @@ public abstract class IgniteUtils {
}
/**
- * Broadcasts given job to nodes that support ignite feature.
+ * Broadcasts given job to nodes that match filter.
*
* @param kctx Kernal context.
* @param job Ignite job.
* @param srvrsOnly Broadcast only on server nodes.
- * @param feature Ignite feature.
+ * @param nodeFilter Node filter.
*/
- public static void broadcastToNodesSupportingFeature(
+ public static IgniteFuture<Void> broadcastToNodesWithFilterAsync(
GridKernalContext kctx,
IgniteRunnable job,
boolean srvrsOnly,
- IgniteFeatures feature
+ IgnitePredicate<ClusterNode> nodeFilter
) {
ClusterGroup cl = kctx.grid().cluster();
if (srvrsOnly)
cl = cl.forServers();
- ClusterGroup grp = cl.forPredicate(node -> IgniteFeatures.nodeSupports(node, feature));
+ ClusterGroup grp = nodeFilter != null ? cl.forPredicate(nodeFilter) : cl;
+
+ if (grp.nodes().isEmpty())
+ return new IgniteFinishedFutureImpl<>();
IgniteCompute compute = kctx.grid().compute(grp);
- compute.broadcast(job);
+ return compute.broadcastAsync(job);
}
/**
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 419c039..1bb7a46 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -704,7 +704,6 @@ org.apache.ignite.internal.processors.cache.KeyCacheObject
org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl
org.apache.ignite.internal.processors.cache.QueryCursorImpl$State
org.apache.ignite.internal.processors.cache.StoredCacheData
-org.apache.ignite.internal.processors.cache.TxOwnerDumpRequestAllowedSettingClosure
org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage
org.apache.ignite.internal.processors.cache.WalStateAbstractMessage
org.apache.ignite.internal.processors.cache.WalStateAckMessage
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
index 0a340c1..77c717c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
@@ -17,14 +17,22 @@
package org.apache.ignite.internal;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.function.Predicate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.mxbean.TransactionsMXBean;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
@@ -34,9 +42,15 @@ import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static java.util.Collections.singletonMap;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.testframework.LogListener.matches;
@@ -44,6 +58,9 @@ import static org.apache.ignite.testframework.LogListener.matches;
*
*/
public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
+ /** Prefix of key for distributed meta storage. */
+ private static final String DIST_CONF_PREFIX = "distrConf-";
+
/** Listener log messages. */
private static ListeningTestLogger testLog;
@@ -58,6 +75,11 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
@@ -72,6 +94,12 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setGridLogger(testLog)
.setClientMode(clientNode)
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ ))
.setCacheConfiguration(
new CacheConfiguration<>()
.setName(DEFAULT_CACHE_NAME)
@@ -90,6 +118,8 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
public void testBasic() throws Exception {
IgniteEx ignite = startGrid(0);
+ ignite.cluster().state(ACTIVE);
+
TransactionsMXBean bean = txMXBean(0);
ignite.transactions().txStart();
@@ -150,110 +180,264 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
}
/**
- * Test to verify the correct change of "Long operations dump timeout." in
- * an immutable cluster.
+ * Test to verify the correct change of {@link TransactionsMXBean#getLongOperationsDumpTimeout()}
+ * in an immutable cluster.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = "100")
public void testChangeLongOperationsDumpTimeoutOnImmutableCluster() throws Exception {
- Map<IgniteEx, TransactionsMXBean> allNodes = new HashMap<>();
-
- for (int i = 0; i < 2; i++)
- allNodes.put(startGrid(i), txMXBean(i));
+ Map<IgniteEx, TransactionsMXBean> allNodes = startGridAndActivate(2);
+ Map<IgniteEx, TransactionsMXBean> clientNodes = new HashMap<>();
+ Map<IgniteEx, TransactionsMXBean> srvNodes = new HashMap<>(allNodes);
clientNode = true;
- for (int i = 2; i < 4; i++)
- allNodes.put(startGrid(i), txMXBean(i));
+ for (int i = 2; i < 4; i++) {
+ IgniteEx igniteEx = startGrid(i);
+
+ TransactionsMXBean transactionsMXBean = txMXBean(i);
+
+ allNodes.put(igniteEx, transactionsMXBean);
+ clientNodes.put(igniteEx, transactionsMXBean);
+ }
//check for default value
- checkLongOperationsDumpTimeoutViaTxMxBean(allNodes, 100L);
+ checkPropertyValueViaTxMxBean(allNodes, 100L, TransactionsMXBean::getLongOperationsDumpTimeout);
+
+ //create property update latches for client nodes
+ Map<IgniteEx, List<CountDownLatch>> updateLatches = new HashMap<>();
+
+ clientNodes.keySet().forEach(ignite -> updateLatches.put(ignite, F.asList(new CountDownLatch(1), new CountDownLatch(1))));
+
+ clientNodes.forEach((igniteEx, bean) -> igniteEx.context().distributedMetastorage().listen(
+ (key) -> key.startsWith(DIST_CONF_PREFIX),
+ (String key, Serializable oldVal, Serializable newVal) -> {
+ if ((long) newVal == 200)
+ updateLatches.get(igniteEx).get(0).countDown();
+ if ((long) newVal == 300)
+ updateLatches.get(igniteEx).get(1).countDown();
+ }
+ ));
- //check update value via server node
long newTimeout = 200L;
- updateLongOperationsDumpTimeoutViaTxMxBean(allNodes, node -> !node.configuration().isClientMode(), newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(allNodes, newTimeout);
- //check update value via client node
+ //update value via server node
+ updatePropertyViaTxMxBean(allNodes, TransactionsMXBean::setLongOperationsDumpTimeout, newTimeout);
+
+ //check new value in server nodes
+ checkPropertyValueViaTxMxBean(srvNodes, newTimeout, TransactionsMXBean::getLongOperationsDumpTimeout);
+
+ //check new value in client nodes
+ for (List<CountDownLatch> list : updateLatches.values()) {
+ CountDownLatch countDownLatch = list.get(0);
+
+ countDownLatch.await(100, TimeUnit.MILLISECONDS);
+ }
+
newTimeout = 300L;
- updateLongOperationsDumpTimeoutViaTxMxBean(allNodes, node -> node.configuration().isClientMode(), newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(allNodes, newTimeout);
+
+ //update value via server node
+ updatePropertyViaTxMxBean(clientNodes, TransactionsMXBean::setLongOperationsDumpTimeout, newTimeout);
+
+ //check new value in server nodes
+ checkPropertyValueViaTxMxBean(srvNodes, newTimeout, TransactionsMXBean::getLongOperationsDumpTimeout);
+
+ //check new value on client nodes
+ for (List<CountDownLatch> list : updateLatches.values()) {
+ CountDownLatch countDownLatch = list.get(1);
+
+ countDownLatch.await(100, TimeUnit.MILLISECONDS);
+ }
}
/**
- * Test to verify the correct change of "Long operations dump timeout." in
- * an mutable cluster.
+ * Test to verify the correct change of {@link TransactionsMXBean#getLongOperationsDumpTimeout()}
+ * in an mutable cluster.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = "100")
public void testChangeLongOperationsDumpTimeoutOnMutableCluster() throws Exception {
- Map<IgniteEx, TransactionsMXBean> node0 = singletonMap(startGrid(0), txMXBean(0));
- Map<IgniteEx, TransactionsMXBean> node1 = singletonMap(startGrid(1), txMXBean(1));
-
- Map<IgniteEx, TransactionsMXBean> allNodes = new HashMap<>();
- allNodes.putAll(node0);
- allNodes.putAll(node1);
+ Map<IgniteEx, TransactionsMXBean> allNodes = startGridAndActivate(2);
long newTimeout = 200L;
- updateLongOperationsDumpTimeoutViaTxMxBean(allNodes, node -> true, newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(allNodes, newTimeout);
- allNodes.clear();
- stopGrid(1);
- node1 = singletonMap(startGrid(1), txMXBean(1));
+ updatePropertyViaTxMxBean(allNodes, TransactionsMXBean::setLongOperationsDumpTimeout, newTimeout);
+
+ checkPropertyValueViaTxMxBean(allNodes, newTimeout, TransactionsMXBean::getLongOperationsDumpTimeout);
+
+ stopAllGrids();
+
+ allNodes = startGridAndActivate(2);
- //check that default value in restart node
- long defTimeout = 100L;
- checkLongOperationsDumpTimeoutViaTxMxBean(node0, newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(node1, defTimeout);
+ //check that new value after restart
+ checkPropertyValueViaTxMxBean(allNodes, newTimeout, TransactionsMXBean::getLongOperationsDumpTimeout);
newTimeout = 300L;
- updateLongOperationsDumpTimeoutViaTxMxBean(node0, node -> true, newTimeout);
- Map<IgniteEx, TransactionsMXBean> node2 = singletonMap(startGrid(2), txMXBean(2));
+ updatePropertyViaTxMxBean(allNodes, TransactionsMXBean::setLongOperationsDumpTimeout, newTimeout);
+
+ allNodes.putAll(singletonMap(startGrid(2), txMXBean(2)));
+
+ //check that last value in new node
+ checkPropertyValueViaTxMxBean(allNodes, newTimeout, TransactionsMXBean::getLongOperationsDumpTimeout);
+ }
+
+ /**
+ * Test to verify the correct change of {@link TransactionsMXBean#getTxOwnerDumpRequestsAllowed()}
+ * in an mutable cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED, value = "false")
+ public void testChangeTxOwnerDumpRequestsAllowed() throws Exception {
+ checkPropertyChangingViaTxMxBean(false, true, TransactionsMXBean::getTxOwnerDumpRequestsAllowed,
+ TransactionsMXBean::setTxOwnerDumpRequestsAllowed);
+ }
- //check that default value in new node
- checkLongOperationsDumpTimeoutViaTxMxBean(node0, newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(node1, newTimeout);
- checkLongOperationsDumpTimeoutViaTxMxBean(node2, defTimeout);
+ /**
+ * Test to verify the correct change of {@link TransactionsMXBean#getLongTransactionTimeDumpThreshold()}
+ * in an mutable cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD, value = "0")
+ public void testChangeLongTransactionTimeDumpThreshold() throws Exception {
+ checkPropertyChangingViaTxMxBean(0L, 999L, TransactionsMXBean::getLongTransactionTimeDumpThreshold,
+ TransactionsMXBean::setLongTransactionTimeDumpThreshold);
}
/**
- * Search for the first node by the predicate and change
- * "Long operations dump timeout." through TransactionsMXBean.
+ * Test to verify the correct change of {@link TransactionsMXBean#getTransactionTimeDumpSamplesCoefficient()}
+ * in an mutable cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, value = "0.0")
+ public void testChangeTransactionTimeDumpSamplesCoefficient() throws Exception {
+ checkPropertyChangingViaTxMxBean(0.0, 1.0, TransactionsMXBean::getTransactionTimeDumpSamplesCoefficient,
+ TransactionsMXBean::setTransactionTimeDumpSamplesCoefficient);
+ }
+
+ /**
+ * Test to verify the correct change of {@link TransactionsMXBean#getTransactionTimeDumpSamplesPerSecondLimit()}
+ * in an mutable cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT, value = "5")
+ public void testChangeLongTransactionTimeDumpSamplesPerSecondLimit() throws Exception {
+ checkPropertyChangingViaTxMxBean(5, 10, TransactionsMXBean::getTransactionTimeDumpSamplesPerSecondLimit,
+ TransactionsMXBean::setTransactionTimeDumpSamplesPerSecondLimit);
+ }
+
+ /**
+ * Test to verify the correct change of {@link TransactionsMXBean#getTxKeyCollisionsInterval()}
+ * in an mutable cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "1100")
+ public void testChangeCollisionsDumpInterval() throws Exception {
+ checkPropertyChangingViaTxMxBean(1100, 1200, TransactionsMXBean::getTxKeyCollisionsInterval,
+ TransactionsMXBean::setTxKeyCollisionsInterval);
+ }
+
+ /**
+ * Test to verify the correct change of property in an mutable cluster.
+ *
+ * @param defVal Default value.
+ * @param newVal New value.
+ * @param getter Getter of property.
+ * @param setter Setter of property.
+ * @param <T> Type of property.
+ * @throws Exception If failed.
+ */
+ private <T> void checkPropertyChangingViaTxMxBean(T defVal, T newVal, Function<TransactionsMXBean, T> getter,
+ BiConsumer<TransactionsMXBean, T> setter
+ ) throws Exception {
+ Map<IgniteEx, TransactionsMXBean> allNodes = startGridAndActivate(2);
+
+ //check for default value
+ checkPropertyValueViaTxMxBean(allNodes, defVal, getter);
+
+ updatePropertyViaTxMxBean(allNodes, setter, newVal);
+
+ //check new value
+ checkPropertyValueViaTxMxBean(allNodes, newVal, getter);
+
+ stopAllGrids();
+
+ allNodes = startGridAndActivate(2);
+
+ //check that new value after restart
+ checkPropertyValueViaTxMxBean(allNodes, newVal, getter);
+
+ allNodes.putAll(singletonMap(startGrid(2), txMXBean(2)));
+
+ //check that last value after adding new node
+ checkPropertyValueViaTxMxBean(allNodes, newVal, getter);
+ }
+
+ /**
+ * Start grid and activate.
+ *
+ * @param cnt Nodes count.
+ * @return Map with started nodes.
+ * @throws Exception If anything failed.
+ */
+ private Map<IgniteEx, TransactionsMXBean> startGridAndActivate(int cnt) throws Exception {
+ Map<IgniteEx, TransactionsMXBean> nodes = new HashMap<>();
+ IgniteEx ignite = null;
+
+ for (int i = 0; i < cnt; i++) {
+ ignite = startGrid(i);
+
+ nodes.put(ignite, txMXBean(i));
+ }
+
+ if (ignite != null)
+ ignite.cluster().state(ACTIVE);
+
+ return nodes;
+ }
+
+ /**
+ * Search for the first node and change property through TransactionsMXBean.
*
* @param nodes Nodes with TransactionsMXBean's.
- * @param nodePred Predicate to search for a node.
- * @param newTimeout New timeout.
+ * @param setter new value setter.
*/
- private void updateLongOperationsDumpTimeoutViaTxMxBean(
+ private <T> void updatePropertyViaTxMxBean(
Map<IgniteEx, TransactionsMXBean> nodes,
- Predicate<? super IgniteEx> nodePred,
- long newTimeout
+ BiConsumer<TransactionsMXBean, T> setter,
+ T val
) {
assertNotNull(nodes);
- assertNotNull(nodePred);
- nodes.entrySet().stream()
- .filter(e -> nodePred.test(e.getKey()))
- .findAny().get().getValue().setLongOperationsDumpTimeout(newTimeout);
+ setter.accept(nodes.entrySet().stream()
+ .findAny().get().getValue(), val);
}
/**
- * Checking the value of "Long operations dump timeout." on all nodes
- * through the TransactionsMXBean.
+ * Checking the value of property on nodes through the TransactionsMXBean.
*
* @param nodes Nodes with TransactionsMXBean's.
- * @param checkTimeout Checked timeout.
+ * @param checkedVal Checked value.
*/
- private void checkLongOperationsDumpTimeoutViaTxMxBean(Map<IgniteEx, TransactionsMXBean> nodes, long checkTimeout) {
+ private static <T> void checkPropertyValueViaTxMxBean(Map<IgniteEx, TransactionsMXBean> nodes, T checkedVal,
+ Function<TransactionsMXBean, T> getter) {
assertNotNull(nodes);
- nodes.forEach((node, txMxBean) -> assertEquals(checkTimeout, txMxBean.getLongOperationsDumpTimeout()));
+ nodes.forEach((node, txMxBean) -> assertEquals(checkedVal, getter.apply(txMxBean)));
}
/**
@@ -272,10 +456,15 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
boolean expectTx
) throws Exception {
IgniteEx ignite = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ ignite.cluster().state(ACTIVE);
TransactionsMXBean txMXBean = txMXBean(0);
+ TransactionsMXBean txMXBean1 = txMXBean(1);
assertEquals(defTimeout, txMXBean.getLongOperationsDumpTimeout());
+ assertEquals(defTimeout, txMXBean1.getLongOperationsDumpTimeout());
Transaction tx = ignite.transactions().txStart();
@@ -288,6 +477,7 @@ public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
txMXBean.setLongOperationsDumpTimeout(newTimeout);
assertEquals(newTimeout, ignite.context().cache().context().tm().longOperationsDumpTimeout());
+ assertEquals(newTimeout, ignite1.context().cache().context().tm().longOperationsDumpTimeout());
if (expectTx)
assertTrue(waitForCondition(() -> lrtLogLsnr.check() && txLogLsnr.check(), waitTimeTx));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
index 4f11086..af2a130 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
@@ -16,11 +16,14 @@
*/
package org.apache.ignite.internal.processors.cache;
+import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
import javax.management.AttributeNotFoundException;
@@ -34,6 +37,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
@@ -104,6 +108,9 @@ public class GridTransactionsSystemUserTimeMetricsTest extends GridCommonAbstrac
private static final String ROLLBACK_TIME_DUMP_REGEX =
".*?Long transaction time dump .*?cacheOperationsTime=[0-9]{1,4}.*?rollbackTime=[0-9]{1,4}.*";
+ /** Prefix of key for distributed meta storage. */
+ private static final String DIST_CONF_PREFIX = "distrConf-";
+
/** */
private LogListener logTxDumpLsnr = new MessageOrderLogListener(TRANSACTION_TIME_DUMP_REGEX);
@@ -233,23 +240,80 @@ public class GridTransactionsSystemUserTimeMetricsTest extends GridCommonAbstrac
* @param coefficient Transaction time dump samples coefficient.
* @param limit Transaction time dump samples per second limit.
* @return Transaction MX bean.
- * @throws Exception If failed.
+ * @throws InterruptedException If the current thread is interrupted while waiting.
*/
- private TransactionsMXBean applyJmxParameters(Long threshold, Double coefficient, Integer limit) throws Exception {
+ private TransactionsMXBean applyJmxParameters(Long threshold, Double coefficient, Integer limit) throws InterruptedException {
TransactionsMXBean tmMxBean = getMxBean(
CLIENT,
"Transactions",
TransactionsMXBeanImpl.class,
TransactionsMXBean.class);
- if (threshold != null)
+ return applyJmxParameters(threshold, coefficient, limit, tmMxBean, client);
+ }
+
+ /**
+ * Applies JMX parameters to node in runtime. Parameters are spreading through the cluster, so this method
+ * allows to change system/user time tracking without restarting the cluster.
+ *
+ * @param threshold Long transaction time dump threshold.
+ * @param coefficient Transaction time dump samples coefficient.
+ * @param limit Transaction time dump samples per second limit.
+ * @param tmMxBean Instance {@link TransactionsMXBean}.
+ * @param ignite Node.
+ * @return Transaction MX bean.
+ * @throws InterruptedException If the current thread is interrupted while waiting.
+ */
+ private TransactionsMXBean applyJmxParameters(Long threshold, Double coefficient, Integer limit,
+ TransactionsMXBean tmMxBean, Ignite ignite
+ ) throws InterruptedException {
+ IgniteEx igniteEx = (IgniteEx)ignite;
+
+ CountDownLatch thresholdLatch = new CountDownLatch(1);
+ CountDownLatch coefficientLatch = new CountDownLatch(1);
+ CountDownLatch limitLatch = new CountDownLatch(1);
+
+ if (threshold != null) {
+ igniteEx.context().distributedMetastorage().listen(
+ (key) -> key.startsWith(DIST_CONF_PREFIX),
+ (String key, Serializable oldVal, Serializable newVal) -> {
+ if (key.endsWith("longTransactionTimeDumpThreshold") && (long) newVal == threshold)
+ thresholdLatch.countDown();
+ });
+
tmMxBean.setLongTransactionTimeDumpThreshold(threshold);
+ }
+
+ if (coefficient != null) {
+ igniteEx.context().distributedMetastorage().listen(
+ (key) -> key.startsWith(DIST_CONF_PREFIX),
+ (String key, Serializable oldVal, Serializable newVal) -> {
+ if (key.endsWith("transactionTimeDumpSamplesCoefficient") && (double) newVal == coefficient)
+ coefficientLatch.countDown();
+ });
- if (coefficient != null)
tmMxBean.setTransactionTimeDumpSamplesCoefficient(coefficient);
+ }
+
+ if (limit != null) {
+ igniteEx.context().distributedMetastorage().listen(
+ (key) -> key.startsWith(DIST_CONF_PREFIX),
+ (String key, Serializable oldVal, Serializable newVal) -> {
+ if (key.endsWith("longTransactionTimeDumpSamplesPerSecondLimit") && (int) newVal == limit)
+ limitLatch.countDown();
+ });
- if (limit != null)
tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(limit);
+ }
+
+ if (threshold != null)
+ thresholdLatch.await(300, TimeUnit.MILLISECONDS);
+
+ if (coefficient != null)
+ coefficientLatch.await(300, TimeUnit.MILLISECONDS);
+
+ if (limit != null)
+ limitLatch.await(300, TimeUnit.MILLISECONDS);
return tmMxBean;
}
@@ -488,7 +552,7 @@ public class GridTransactionsSystemUserTimeMetricsTest extends GridCommonAbstrac
*/
@Test
public void testJmxParametersSpreading() throws Exception {
- startClientGrid(CLIENT_2);
+ IgniteEx client2 = startGrid(CLIENT_2);
try {
TransactionsMXBean tmMxBean = getMxBean(
@@ -512,18 +576,15 @@ public class GridTransactionsSystemUserTimeMetricsTest extends GridCommonAbstrac
long newThreshold = 99999;
double newCoefficient = 0.01;
- tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(newLimit);
- tmMxBean2.setLongTransactionTimeDumpThreshold(newThreshold);
- tmMxBean.setTransactionTimeDumpSamplesCoefficient(newCoefficient);
+ applyJmxParameters(null, newCoefficient, newLimit);
+ applyJmxParameters(newThreshold, null, null, tmMxBean2, client2);
assertEquals(newLimit, tmMxBean2.getTransactionTimeDumpSamplesPerSecondLimit());
assertEquals(newThreshold, tmMxBean.getLongTransactionTimeDumpThreshold());
assertTrue(tmMxBean2.getTransactionTimeDumpSamplesCoefficient() - newCoefficient < 0.0001);
}
finally {
- tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(oldLimit);
- tmMxBean.setLongTransactionTimeDumpThreshold(oldThreshold);
- tmMxBean.setTransactionTimeDumpSamplesCoefficient(oldCoefficient);
+ applyJmxParameters(oldThreshold, oldCoefficient, oldLimit);
}
}
finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 8d22823..db87461 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -401,9 +401,9 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
public void testWalHistoryPartiallyRemoved() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-10551", MvccFeatureChecker.forcedMvcc());
- int entryCnt = 10_000;
+ int entryCnt = 9_500;
- IgniteEx ig0 = (IgniteEx)startGrids(2);
+ IgniteEx ig0 = startGrids(2);
ig0.cluster().active(true);