You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/04/26 10:18:48 UTC
[ignite] 11/17: GG-17387 Transactional Datacenter Replication
implemented
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-17462
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit f188175382dab58e0c04321659e9ff1d03019a64
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Apr 24 20:44:55 2019 +0300
GG-17387 Transactional Datacenter Replication implemented
Contributors list:
Aleksey Plekhanov <Pl...@gmail.com>
Andey Gura <ag...@apache.org>
Andrey Kuznetsov <st...@gmail.com>
Dmitriy Sorokin <d....@gmail.com>
Ivan Daschinskiy <iv...@gmail.com>
Ivan Rakov <iv...@gmail.com>
Sergey Antonov <an...@gmail.com>
Sergey Kosarev <sk...@gridgain.com>
Vyacheslav Koptilin <sl...@gmail.com>
---
.../org/apache/ignite/IgniteSystemProperties.java | 2 +
.../org/apache/ignite/internal/GridComponent.java | 5 +-
.../apache/ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../java/org/apache/ignite/internal/GridTopic.java | 5 +-
.../org/apache/ignite/internal/IgniteKernal.java | 21 +-
.../ignite/internal/cluster/IgniteClusterImpl.java | 15 +-
.../communication/GridIoMessageFactory.java | 1 +
.../managers/discovery/GridDiscoveryManager.java | 20 +
.../pagemem/wal/record/ConsistentCutRecord.java | 26 +
.../internal/pagemem/wal/record/DataEntry.java | 5 +-
.../internal/pagemem/wal/record/WALRecord.java | 5 +-
.../processors/cache/CacheGroupContext.java | 1 +
.../processors/cache/GridCacheMapEntry.java | 2 +
.../cache/GridCachePartitionExchangeManager.java | 118 ++-
.../internal/processors/cache/WalStateManager.java | 2 +-
.../cache/binary/BinaryMetadataHolder.java | 4 +-
.../GridDistributedTxRemoteAdapter.java | 23 +
.../dht/GridDhtTopologyFutureAdapter.java | 10 +-
.../dht/IgniteClusterReadOnlyException.java | 41 +
.../distributed/dht/preloader/ExchangeType.java | 30 +
.../dht/preloader/GridDhtPartitionDemander.java | 6 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 45 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 74 +-
.../dht/preloader/GridDhtPreloader.java | 6 +
.../dht/preloader/PartitionsExchangeAware.java | 37 +
.../dht/topology/GridClientPartitionTopology.java | 7 +-
.../dht/topology/GridDhtPartitionTopology.java | 8 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 29 +-
.../near/GridNearTxPrepareFutureAdapter.java | 1 +
.../GridCacheDatabaseSharedManager.java | 348 ++++++---
.../cache/persistence/metastorage/MetaStorage.java | 12 +-
.../cache/persistence/tree/BPlusTree.java | 30 +-
.../wal/AbstractWalRecordsIterator.java | 11 +-
.../persistence/wal/FileWriteAheadLogManager.java | 4 +-
.../cache/persistence/wal/aware/SegmentAware.java | 6 +-
.../wal/aware/SegmentCompressStorage.java | 39 +-
.../wal/reader/IgniteWalIteratorFactory.java | 46 +-
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../wal/reader/StandaloneWalRecordsIterator.java | 11 +-
.../wal/serializer/RecordDataV2Serializer.java | 10 +
.../wal/serializer/RecordSerializerFactory.java | 5 +
.../serializer/RecordSerializerFactoryImpl.java | 8 +
.../cache/transactions/IgniteTxLocalAdapter.java | 3 +
.../cache/transactions/IgniteTxManager.java | 42 +-
.../LocalPendingTransactionsTracker.java | 590 ++++++++++++++
.../cache/transactions/TrackCommittedResult.java | 57 ++
.../cache/version/GridCacheVersionManager.java | 16 +-
.../cluster/BaselineTopologyHistoryItem.java | 20 +
.../cluster/ChangeGlobalStateMessage.java | 8 +
.../cluster/GridClusterStateProcessor.java | 234 ++++--
.../txdr/NoOpTransactionalDrProcessor.java | 82 ++
.../processors/txdr/TransactionalDrProcessor.java | 77 ++
.../ignite/internal/util/GridCircularBuffer.java | 68 +-
.../apache/ignite/internal/util/IgniteUtils.java | 37 +
.../ignite/internal/util/StripedExecutor.java | 10 +
.../org/apache/ignite/logger/EchoingLogger.java | 144 ++++
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 10 +
.../cache/ClusterReadOnlyModeAbstractTest.java | 10 +
.../processors/cache/ClusterReadOnlyModeTest.java | 12 +-
...ocalWalModeChangeDuringRebalancingSelfTest.java | 2 +
.../wal/IgniteNodeStoppedDuringDisableWALTest.java | 2 +-
.../persistence/wal/aware/SegmentAwareTest.java | 57 +-
.../reader/StandaloneWalRecordsIteratorTest.java | 213 ++++-
.../LocalPendingTransactionsTrackerTest.java | 853 +++++++++++++++++++++
.../lang/utils/GridCircularBufferSelfTest.java | 79 ++
.../junits/common/GridCommonAbstractTest.java | 16 +
.../apache/ignite/util/GridCommandHandlerTest.java | 2 +-
.../cache/ClusterReadOnlyModeSqlTest.java | 16 +-
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 11 +
.../spi/discovery/zk/internal/ZkRuntimeState.java | 2 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 9 +
.../zk/ZookeeperDiscoverySpiTestUtil.java | 11 +-
parent/pom.xml | 2 +-
74 files changed, 3462 insertions(+), 338 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 886e8c1..85b44f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -265,6 +265,8 @@ public final class IgniteSystemProperties {
* System property to enable pending transaction tracker.
* Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property:
* if this property is set, WAL anyway won't be disabled during rebalancing triggered by baseline topology change.
+ * Now it is not nessessary setting this property before grid starting because enabling of pendingTxTracker happens
+ * automatically if needed, and it should be used for test purposes only.
*/
public static final String IGNITE_PENDING_TX_TRACKER_ENABLED = "IGNITE_PENDING_TX_TRACKER_ENABLED";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 6f6f8d1..5861665 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -75,7 +75,10 @@ public interface GridComponent {
SERVICE_PROC,
/** Distributed MetaStorage processor. */
- META_STORAGE;
+ META_STORAGE,
+
+ /** Transactional data replication processor. */
+ TX_DR_PROC;
/** Cached values array. */
public static final DiscoveryDataExchangeType[] VALUES = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 6906ea0..c89831e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.stat.IoStatisticsManager;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -429,6 +430,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public GridSecurityProcessor security();
/**
+ * Gets transactional data replication processor.
+ *
+ * @return Transactional data replication processor.
+ */
+ public TransactionalDrProcessor txDr();
+
+ /**
* Gets load balancing manager.
*
* @return Load balancing manager.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4c900a2..e5fddfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.stat.IoStatisticsManager;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -321,6 +322,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ private TransactionalDrProcessor txDrProc;
+
+ /** */
+ @GridToStringExclude
private List<GridComponent> comps = new LinkedList<>();
/** */
@@ -672,6 +677,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
authProc = (IgniteAuthenticationProcessor)comp;
else if (comp instanceof CompressionProcessor)
compressProc = (CompressionProcessor)comp;
+ else if (comp instanceof TransactionalDrProcessor)
+ txDrProc = (TransactionalDrProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -845,6 +852,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public TransactionalDrProcessor txDr() {
+ return txDrProc;
+ }
+
+ /** {@inheritDoc} */
@Override public GridLoadBalancerManager loadBalancing() {
return loadMgr;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 4450166..2d25ef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -141,7 +141,10 @@ public enum GridTopic {
TOPIC_SERVICES,
/** */
- TOPIC_DEADLOCK_DETECTION;
+ TOPIC_DEADLOCK_DETECTION,
+
+ /** */
+ TOPIC_TXDR;
/** Enum values. */
private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 638755f..fa242cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -81,7 +81,6 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.MemoryMetrics;
import org.apache.ignite.PersistenceMetrics;
import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -170,6 +169,8 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.NoOpTransactionalDrProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
@@ -1031,6 +1032,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DistributedMetaStorageImpl(ctx));
startProcessor(new DistributedConfigurationProcessor(ctx));
+ // Start transactional data replication processor.
+ startProcessor(createComponent(TransactionalDrProcessor.class, ctx));
+
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) {
ctx.add(new GridPluginComponent(provider));
@@ -3658,18 +3662,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
cluster().active(active);
}
- /** */
- private Collection<BaselineNode> baselineNodes() {
- Collection<ClusterNode> srvNodes = cluster().forServers().nodes();
-
- ArrayList baselineNodes = new ArrayList(srvNodes.size());
-
- for (ClusterNode clN : srvNodes)
- baselineNodes.add(clN);
-
- return baselineNodes;
- }
-
/** {@inheritDoc} */
@Override public void resetLostPartitions(Collection<String> cacheNames) {
CU.validateCacheNames(cacheNames);
@@ -4149,6 +4141,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (cls.equals(IGridClusterStateProcessor.class))
return (T)new GridClusterStateProcessor(ctx);
+ if (cls.equals(TransactionalDrProcessor.class))
+ return (T)new NoOpTransactionalDrProcessor(ctx);
+
Class<T> implCls = null;
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 32691db..69f8f05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -1,12 +1,12 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
+ *
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,8 +43,6 @@ import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterStartNodeResult;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.ClusterActivationEvent;
-import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -479,7 +477,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
target.add(node);
}
- validateBeforeBaselineChange(target);
+ ctx.state().validateBeforeBaselineChange(target);
ctx.state().changeGlobalState(true, target, true, isBaselineAutoAdjust).get();
}
@@ -767,8 +765,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
// If there is nothing to start, return finished future with empty result.
if (nodeCallCnt == 0)
- return new GridFinishedFuture<Collection<ClusterStartNodeResult>>(
- Collections.<ClusterStartNodeResult>emptyList());
+ return new GridFinishedFuture<>(Collections.emptyList());
// Exceeding max line width for readability.
GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut =
@@ -887,4 +884,4 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@Override public String toString() {
return "IgniteCluster [igniteInstanceName=" + ctx.igniteInstanceName() + ']';
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d716e84..31b06fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -1159,6 +1159,7 @@ public class GridIoMessageFactory implements MessageFactory {
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
// [2048..2053] - Snapshots
+ // [4096..4096] - TxDR
default:
if (ext != null) {
for (MessageFactory factory : ext) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index aa39f8f..a5725a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -24,6 +24,7 @@ import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -2343,6 +2344,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Sets grid start time.
+ *
+ * @param val New time value.
+ */
+ public void setGridStartTime(long val) {
+ DiscoverySpi spi = getSpi();
+
+ try {
+ spi.getClass().getMethod("setGridStartTime", long.class).invoke(spi, val);
+ }
+ catch (NoSuchMethodException e) {
+ U.error(log, "Discovery SPI has no 'setGridStartTime(long)' method [class=" + spi.getClass() + ']', e);
+ }
+ catch (IllegalAccessException | InvocationTargetException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
* @param nodeId Node ID.
* @param warning Warning message to be shown on all nodes.
* @return Whether node is failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java
new file mode 100644
index 0000000..822efd6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ConsistentCutRecord.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.pagemem.wal.record;
+
+/**
+ * todo GG-13416 doc
+ */
+public class ConsistentCutRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.CONSISTENT_CUT;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index add5b70..7c039e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -97,8 +97,9 @@ public class DataEntry {
this.partId = partId;
this.partCnt = partCnt;
- // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
- assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
+ assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE ||
+ op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE :
+ "Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL [op=" + op + ']';
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 4406ffa..4751502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -207,7 +207,10 @@ public abstract class WALRecord {
MVCC_DATA_RECORD (LOGICAL),
/** Mvcc Tx state change record. */
- MVCC_TX_RECORD (LOGICAL);
+ MVCC_TX_RECORD (LOGICAL),
+
+ /** Consistent cut record. */
+ CONSISTENT_CUT;
/**
* When you're adding a new record don't forget to choose record purpose explicitly
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index f1cfc85..582c444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -864,6 +864,7 @@ public class CacheGroupContext {
}
/**
+
* @return {@code True} if current cache group is in recovery mode.
*/
public boolean isRecoveryMode() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4889153..81872fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -4332,6 +4332,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
op = this.val == null ? GridCacheOperation.CREATE : UPDATE;
+ cctx.tm().pendingTxsTracker().onKeysWritten(tx.nearXidVersion(), Collections.singletonList(key));
+
return cctx.shared().wal().log(new DataRecord(new DataEntry(
cctx.cacheId(),
key,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ee81b06..c606a2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
@@ -260,6 +261,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Distributed latch manager. */
private ExchangeLatchManager latchMgr;
+ /** List of exchange aware components. */
+ private final List<PartitionsExchangeAware> exchangeAwareComps = new ArrayList<>();
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -1146,6 +1150,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Registers component that will be notified on every partition map exchange.
+ *
+ * @param comp Component to be registered.
+ */
+ public void registerExchangeAwareComponent(PartitionsExchangeAware comp) {
+ exchangeAwareComps.add(new PartitionsExchangeAwareWrapper(comp));
+ }
+
+ /**
+ * Removes exchange aware component from list of listeners.
+ *
+ * @param comp Component to be registered.
+ */
+ public void unregisterExchangeAwareComponent(PartitionsExchangeAware comp) {
+ exchangeAwareComps.remove(new PartitionsExchangeAwareWrapper(comp));
+ }
+
+ /**
+ * @return List of registered exchange listeners.
+ */
+ public List<PartitionsExchangeAware> exchangeAwareComponents() {
+ return U.sealList(exchangeAwareComps);
+ }
+
+ /**
* Partition refresh callback for selected cache groups.
* For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
* for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
@@ -1239,9 +1268,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
) {
long time = System.currentTimeMillis();
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null, grps);
-
- m.topologyVersion(msgTopVer);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null,
+ msgTopVer, null, null, null, grps);
if (log.isInfoEnabled()) {
long latency = System.currentTimeMillis() - time;
@@ -1306,13 +1334,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean compress,
boolean newCntrMap,
@Nullable final GridDhtPartitionExchangeId exchId,
+ @Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
- return createPartitionsFullMessage(compress, newCntrMap, exchId, lastVer, partHistSuppliers, partsToReload, grps);
+ return createPartitionsFullMessage(compress, newCntrMap, exchId, msgTopVer, lastVer, partHistSuppliers, partsToReload, grps);
}
/**
@@ -1332,15 +1361,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean compress,
boolean newCntrMap,
@Nullable final GridDhtPartitionExchangeId exchId,
+ @Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload,
Collection<CacheGroupContext> grps
) {
- AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
+ assert (exchId != null) ^ (msgTopVer != null): "Topology version of full map message must be specified" +
+ " either via exchangeId=[" + exchId + "], or via msgTopVer=[" + msgTopVer + "].";
- final GridDhtPartitionsFullMessage m =
- new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload);
+ final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+ lastVer,
+ exchId != null ? exchId.topologyVersion() : msgTopVer,
+ partHistSuppliers,
+ partsToReload
+ );
m.compress(compress);
@@ -1396,6 +1431,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ cctx.kernalContext().txDr().onPartitionsFullMessagePrepared(exchId, m);
+
return m;
}
@@ -3614,6 +3651,73 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * That wrapper class allows avoiding the propagation of the user's exceptions into the Exchange thread.
+ */
+ private class PartitionsExchangeAwareWrapper implements PartitionsExchangeAware {
+ /** */
+ private final PartitionsExchangeAware delegate;
+
+ /**
+ * Creates a new wrapper.
+ * @param delegate Delegate.
+ */
+ public PartitionsExchangeAwareWrapper(PartitionsExchangeAware delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ delegate.onInitBeforeTopologyLock(fut);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to execute exchange callback.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ delegate.onInitAfterTopologyLock(fut);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to execute exchange callback.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ delegate.onDoneBeforeTopologyUnlock(fut);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to execute exchange callback.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ delegate.onDoneAfterTopologyUnlock(fut);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to execute exchange callback.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+ @Override public boolean equals(Object obj) {
+ return delegate.equals(obj);
+ }
+ }
+
+ /**
* Class to limit action count for unique objects.
* <p>
* NO guarantees of thread safety are provided.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 2156d33..14f0399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -390,7 +390,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
*/
public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) {
if (changedBaseline
- && IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED)
+ && cctx.tm().pendingTxsTracker().enabled()
|| !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, true))
return;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
index f850d34..2a9834a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
@@ -38,8 +38,8 @@ final class BinaryMetadataHolder implements Serializable {
/**
* @param metadata Metadata.
- * @param pendingVer Version of this metadata - how many updates were issued for this type.
- * @param acceptedVer Pending updates count.
+ * @param pendingVer Pending updates count.
+ * @param acceptedVer Version of this metadata - how many updates were issued for this type.
*/
BinaryMetadataHolder(BinaryMetadata metadata, int pendingVer, int acceptedVer) {
assert metadata != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index d3b573c..3e24968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -775,6 +775,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
.map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
.collect(Collectors.toList());
+ logKeysToPendingTxsTracker(entriesWithCounters);
+
cctx.wal().log(new DataRecord(entriesWithCounters));
}
@@ -817,6 +819,27 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
+ /**
+ * @param dataEntries Data entries.
+ */
+ private void logKeysToPendingTxsTracker(List<DataEntry> dataEntries) {
+ for (DataEntry dataEntry : dataEntries) {
+ List<KeyCacheObject> readKeys = new ArrayList<>();
+ List<KeyCacheObject> writeKeys = new ArrayList<>();
+
+ if (dataEntry.op() == READ)
+ readKeys.add(dataEntry.key());
+ else
+ writeKeys.add(dataEntry.key());
+
+ if (!readKeys.isEmpty())
+ cctx.tm().pendingTxsTracker().onKeysRead(nearXidVersion(), readKeys);
+
+ if (!writeKeys.isEmpty())
+ cctx.tm().pendingTxsTracker().onKeysWritten(nearXidVersion(), writeKeys);
+ }
+ }
+
/** {@inheritDoc} */
@Override public final void commitRemoteTx() throws IgniteCheckedException {
if (optimistic())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index db097af..41e7fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,15 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DEFAULT_VOLATILE_DS_GROUP_NAME;
/**
*
@@ -93,8 +96,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
PartitionLossPolicy lossPlc = grp.config().getPartitionLossPolicy();
- if (cctx.shared().readOnlyMode() && opType == WRITE)
- return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)");
+ if (cctx.shared().readOnlyMode() && opType == WRITE && !isSystemCache(cctx.name())
+ && cctx.group().groupId() != CU.cacheId(DEFAULT_VOLATILE_DS_GROUP_NAME)) {
+ return new IgniteClusterReadOnlyException("Failed to perform cache operation (cluster is in read only mode) " +
+ "[cacheGrp=" + cctx.group().name() + ", cache=" + cctx.name() + ']');
+ }
if (grp.needsRecovery() && !recovery) {
if (opType == WRITE && (lossPlc == READ_ONLY_SAFE || lossPlc == READ_ONLY_ALL))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java
new file mode 100644
index 0000000..b83eb5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteClusterReadOnlyException.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ This exception is used to indicate that the cluster is in a read-only state
+ */
+public class IgniteClusterReadOnlyException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create empty exception.
+ */
+ public IgniteClusterReadOnlyException() {
+ }
+
+ /**
+ * Creates new exception with given error message.
+ *
+ * @param msg Error message.
+ */
+ public IgniteClusterReadOnlyException(String msg) {
+ super(msg);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java
new file mode 100644
index 0000000..f7d1e35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ExchangeType.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+/**
+ * todo javadoc
+ */
+public enum ExchangeType {
+ /** */
+ CLIENT,
+
+ /** */
+ ALL,
+
+ /** */
+ NONE
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6b97678..47b9c0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -318,9 +318,8 @@ public class GridDhtPartitionDemander {
metrics.clearRebalanceCounters();
for (GridDhtPartitionDemandMessage msg : assignments.values()) {
- for (Integer partId : msg.partitions().fullSet()) {
+ for (Integer partId : msg.partitions().fullSet())
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
- }
CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
@@ -783,6 +782,9 @@ public class GridDhtPartitionDemander {
// If message was last for this partition,
// then we take ownership.
if (last) {
+ if (ctx.kernalContext().txDr().shouldApplyUpdateCounterOnRebalance())
+ grp.offheap().onPartitionInitialCounterUpdated(p, supplyMsg.last().get(p));
+
fut.partitionDone(nodeId, p, true);
if (log.isDebugEnabled())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 59d8463..cd3ecc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -222,6 +222,8 @@ class GridDhtPartitionSupplier {
SupplyContext sctx = null;
+ Map<Integer, Long> initUpdateCntrs;
+
try {
synchronized (scMap) {
sctx = scMap.remove(contextId);
@@ -274,12 +276,26 @@ class GridDhtPartitionSupplier {
Set<Integer> remainingParts;
if (sctx == null || sctx.iterator == null) {
- iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
-
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
+ Set<Integer> demandParts = new HashSet<>(demandMsg.partitions().fullSet());
+
CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();
+ for (int i = 0; i < histMap.size(); ++i)
+ demandParts.add(histMap.partitionAt(i));
+
+ initUpdateCntrs = new HashMap<>(demandParts.size());
+
+ for (Integer part : demandParts) {
+ GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false);
+
+ if (loc != null && loc.state() == GridDhtPartitionState.OWNING)
+ initUpdateCntrs.put(part, loc.updateCounter());
+ }
+
+ iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
+
for (int i = 0; i < histMap.size(); i++) {
int p = histMap.partitionAt(i);
@@ -311,6 +327,8 @@ class GridDhtPartitionSupplier {
iter = sctx.iterator;
remainingParts = sctx.remainingParts;
+
+ initUpdateCntrs = sctx.initUpdateCntrs;
}
final int msgMaxSize = grp.config().getRebalanceBatchSize();
@@ -332,7 +350,8 @@ class GridDhtPartitionSupplier {
saveSupplyContext(contextId,
iter,
remainingParts,
- demandMsg.rebalanceId()
+ demandMsg.rebalanceId(),
+ initUpdateCntrs
);
reply(topicId, demanderNode, demandMsg, supplyMsg, contextId);
@@ -393,7 +412,7 @@ class GridDhtPartitionSupplier {
}
if (iter.isPartitionDone(part)) {
- supplyMsg.last(part, loc.updateCounter());
+ supplyMsg.last(part, initUpdateCntrs.get(part));
remainingParts.remove(part);
@@ -595,17 +614,19 @@ class GridDhtPartitionSupplier {
* @param entryIt Entries rebalance iterator.
* @param remainingParts Set of partitions that weren't sent yet.
* @param rebalanceId Rebalance id.
+ * @param initUpdateCntrs Collection of update counters that corresponds to the beginning of rebalance.
*/
private void saveSupplyContext(
T3<UUID, Integer, AffinityTopologyVersion> contextId,
IgniteRebalanceIterator entryIt,
Set<Integer> remainingParts,
- long rebalanceId
+ long rebalanceId,
+ Map<Integer, Long> initUpdateCntrs
) {
synchronized (scMap) {
assert scMap.get(contextId) == null;
- scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
+ scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId, initUpdateCntrs));
}
}
@@ -623,17 +644,27 @@ class GridDhtPartitionSupplier {
/** Rebalance id. */
private final long rebalanceId;
+ /** Update counters for rebalanced partitions. */
+ private final Map<Integer, Long> initUpdateCntrs;
+
/**
* Constructor.
*
* @param iterator Entries rebalance iterator.
* @param remainingParts Set of partitions which weren't sent yet.
* @param rebalanceId Rebalance id.
+ * @param initUpdateCntrs Collection of update counters that corresponds to the beginning of rebalance.
*/
- SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> remainingParts, long rebalanceId) {
+ SupplyContext(
+ IgniteRebalanceIterator iterator,
+ Set<Integer> remainingParts,
+ long rebalanceId,
+ Map<Integer, Long> initUpdateCntrs
+ ) {
this.iterator = iterator;
this.remainingParts = remainingParts;
this.rebalanceId = rebalanceId;
+ this.initUpdateCntrs = initUpdateCntrs;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4e223c4..a819b8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -204,6 +205,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private AtomicBoolean added = new AtomicBoolean(false);
+ /** Exchange type. */
+ private volatile ExchangeType exchangeType;
+
/**
* Discovery event receive latch. There is a race between discovery event processing and single message
* processing, so it is possible to create an exchange future before the actual discovery event is received.
@@ -496,6 +500,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * @return Exchange type or <code>null</code> if not determined yet.
+ */
+ public ExchangeType exchangeType() {
+ return exchangeType;
+ }
+
+ /**
* Retreives the node which has WAL history since {@code cntrSince}.
*
* @param grpId Cache group ID.
@@ -852,6 +863,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), afterLsnrCompleteFut);
+ exchangeType = exchange;
+
+ for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+ comp.onInitBeforeTopologyLock(this);
+
updateTopologies(crdNode);
timeBag.finishGlobalStage("Determine exchange type");
@@ -899,6 +915,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+ comp.onInitAfterTopologyLock(this);
+
if (exchLog.isInfoEnabled())
exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
}
@@ -1185,6 +1204,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionBegin();
try {
+ kctx.txDr().onDeActivate(cctx.kernalContext());
+
kctx.dataStructures().onDeActivate(kctx);
if (cctx.kernalContext().service() instanceof GridServiceProcessor)
@@ -1938,6 +1959,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
compress,
newCntrMap,
exchangeId(),
+ null,
last != null ? last : cctx.versions().last(),
partHistSuppliers,
partsToReload);
@@ -2207,6 +2229,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null)
cctx.coordinators().onExchangeDone(events().discoveryCache());
+ for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+ comp.onDoneBeforeTopologyUnlock(this);
+
// Create and destory caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
@@ -2308,6 +2333,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
+ comp.onDoneAfterTopologyUnlock(this);
+
if (firstDiscoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
@@ -3150,7 +3178,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
top.globalPartSizes(partSizes);
- Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory);
+ TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+ boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
+ Set<UUID> joinedNodes = new HashSet<>();
+
+ for (DiscoveryEvent evt : events().events()) {
+ if (evt.type() == EVT_NODE_JOINED)
+ joinedNodes.add(evt.eventNode().id());
+ }
+
+ Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(
+ ownersByUpdCounters, haveHistory, joinedNodes, skipResetOwners);
for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
UUID nodeId = e.getKey();
@@ -3413,6 +3453,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
});
}
+ TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+ boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
timeBag.finishGlobalStage("Affinity recalculation (crd)");
Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
@@ -3469,9 +3513,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
timeBag.finishGlobalStage("Ideal affinity diff calculation (enforced)");
}
- for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
- if (!grpCtx.isLocal())
- grpCtx.topology().applyUpdateCounters();
+ if (!skipResetOwners) {
+ for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
+ if (!grpCtx.isLocal())
+ grpCtx.topology().applyUpdateCounters();
+ }
}
timeBag.finishGlobalStage("Apply update counters");
@@ -3639,6 +3685,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionsSingleMessage msg,
Map<Integer, CacheGroupAffinityMessage> messageAccumulator
) {
+ TransactionalDrProcessor txDrProc = cctx.kernalContext().txDr();
+
+ boolean skipResetOwners = txDrProc != null && txDrProc.shouldIgnoreAssignPartitionStates(this);
+
for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
Integer grpId = e.getKey();
@@ -3650,7 +3700,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
- if (cntrs != null)
+ if (cntrs != null && !skipResetOwners)
top.collectUpdateCounters(cntrs);
}
@@ -5034,20 +5084,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
*
*/
- enum ExchangeType {
- /** */
- CLIENT,
-
- /** */
- ALL,
-
- /** */
- NONE
- }
-
- /**
- *
- */
private enum ExchangeLocalState {
/** Local node is coordinator. */
CRD,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 5181e90..2c99314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -182,6 +183,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (exchFut.localJoinExchange())
return true; // Required, can have outdated updSeq partition counter if node reconnects.
+ TransactionalDrProcessor txDrProc = ctx.kernalContext().txDr();
+
+ if (txDrProc != null && txDrProc.shouldScheduleRebalance(exchFut))
+ return true;
+
if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
"Empty hisroty allowed only for newly started cache group [rebTopVer=" + rebTopVer +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
new file mode 100644
index 0000000..ffe5766
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+/**
+ * todo javadocs
+ */
+public interface PartitionsExchangeAware {
+ public default void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ public default void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ public default void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ public default void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index e9b8b6b..e3d06c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1161,7 +1161,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
+ @Override public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHistory,
+ Set<UUID> joinedNodes,
+ boolean skipResetOwners
+ ) {
Map<UUID, Set<Integer>> result = new HashMap<>();
lock.writeLock().lock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 890693b..61a5408 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -423,9 +423,15 @@ public interface GridDhtPartitionTopology {
* @param ownersByUpdCounters Map (partition, set of node IDs that have most actual state about partition
* (update counter is maximal) and should hold OWNING state for such partition).
* @param haveHistory Set of partitions which have WAL history to rebalance.
+ * @param joinedNodes Set of nodes that joined topology in case of distributed exchange.
+ * @param skipResetOwners If true, resetting owner will skipped for all nodes except joined ones.
* @return Map (nodeId, set of partitions that should be rebalanced <b>fully</b> by this node).
*/
- public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory);
+ public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHistory,
+ Set<UUID> joinedNodes,
+ boolean skipResetOwners);
/**
* Callback on exchange done.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index aed494d..fbddb1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -882,8 +882,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
GridDhtLocalPartition part = locParts.get(p);
- if (part != null && part.state() != EVICTED)
- return part;
+ if (part != null) {
+ if (part.state() != EVICTED)
+ return part;
+ else
+ part.awaitDestroy();
+ }
part = new GridDhtLocalPartition(ctx, grp, p, true);
@@ -2187,7 +2191,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) {
+ @Override public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHistory,
+ Set<UUID> joinedNodes,
+ boolean skipResetOwners
+ ) {
Map<UUID, Set<Integer>> result = new HashMap<>();
ctx.database().checkpointReadLock();
@@ -2197,6 +2206,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
// First process local partitions.
+ UUID locNodeId = ctx.localNodeId();
+
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();
@@ -2206,11 +2217,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (locPart == null || locPart.state() != OWNING)
continue;
- if (!newOwners.contains(ctx.localNodeId())) {
+ if ((joinedNodes.contains(locNodeId) || !skipResetOwners) && !newOwners.contains(locNodeId)) {
rebalancePartition(part, haveHistory.contains(part));
- result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>());
- result.get(ctx.localNodeId()).add(part);
+ result.computeIfAbsent(locNodeId, n -> new HashSet<>());
+ result.get(locNodeId).add(part);
}
}
@@ -2221,6 +2232,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
+
+ if (!joinedNodes.contains(remoteNodeId) && skipResetOwners)
+ continue;
+
GridDhtPartitionMap partMap = remotes.getValue();
GridDhtPartitionState state = partMap.get(part);
@@ -2233,7 +2248,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
- if (partMap.nodeId().equals(ctx.localNodeId()))
+ if (partMap.nodeId().equals(locNodeId))
updateSeq.setIfGreater(partMap.updateSequence());
result.computeIfAbsent(remoteNodeId, n -> new HashSet<>());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 0b75dfc..12793da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a300447..7251794 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -45,6 +45,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -59,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -121,6 +123,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
@@ -174,6 +177,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
@@ -224,7 +228,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
/** Value of property for throttling policy override. */
- private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
+ private final String throttlingPlcOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
/** */
@@ -270,6 +274,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+ /** Throttle logging threshold. Warning will be raised if thread is being parked for this long. */
+ private static final long THROTTLE_LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(5);
+
+ /** Throttle queue size threshold. Async applying will be throttled starting from this queue size. */
+ private static final int THROTTLE_QUEUE_SIZE_THRESHOLD = 10_000;
+
/** This number of threads will be created and used for parallel sorting. */
private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
@@ -292,7 +302,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private long checkpointFreq;
/** */
- private CheckpointHistory cpHistory;
+ private CheckpointHistory cpHist;
/** */
private FilePageStoreManager storeMgr;
@@ -378,7 +388,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
/** Initially local wal disabled groups. */
- private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>();
+ private Collection<Integer> initiallyLocWalDisabledGrps = new HashSet<>();
/** File I/O factory for writing checkpoint markers. */
private final FileIOFactory ioFactory;
@@ -536,7 +546,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
- cpHistory = new CheckpointHistory(kernalCtx);
+ cpHist = new CheckpointHistory(kernalCtx);
IgnitePageStoreManager store = cctx.pageStore();
@@ -629,8 +639,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void cleanupCheckpointDirectory() throws IgniteCheckedException {
- if (cpHistory != null)
- cpHistory = new CheckpointHistory(cctx.kernalContext());
+ if (cpHist != null)
+ cpHist = new CheckpointHistory(cctx.kernalContext());
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath())) {
@@ -1139,17 +1149,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
- if (trackable)
+ if (trackable) {
changeTracker = new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
- if (trackable)
- snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
+ snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
};
+ }
else
changeTracker = null;
@@ -1241,13 +1251,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
: PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
- if (throttlingPolicyOverride != null) {
+ if (throttlingPlcOverride != null) {
try {
- plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
+ plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPlcOverride.toUpperCase());
}
catch (IllegalArgumentException e) {
log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property. " +
- "The default throttling policy will be used [plc=" + throttlingPolicyOverride +
+ "The default throttling policy will be used [plc=" + throttlingPlcOverride +
", defaultPlc=" + plc + ']');
}
}
@@ -1731,7 +1741,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
try {
- earliestValidCheckpoints = cpHistory.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
+ earliestValidCheckpoints = cpHist.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
}
finally {
checkpointReadUnlock();
@@ -1817,7 +1827,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
- CheckpointEntry cpEntry = cpHistory.searchCheckpointEntry(grpId, partId, cntr);
+ CheckpointEntry cpEntry = cpHist.searchCheckpointEntry(grpId, partId, cntr);
if (cpEntry == null)
return false;
@@ -1898,7 +1908,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public WALPointer lastCheckpointMarkWalPointer() {
- CheckpointEntry lastCheckpointEntry = cpHistory == null ? null : cpHistory.lastCheckpoint();
+ CheckpointEntry lastCheckpointEntry = cpHist == null ? null : cpHist.lastCheckpoint();
return lastCheckpointEntry == null ? null : lastCheckpointEntry.checkpointMark();
}
@@ -2380,7 +2390,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
}
- cpHistory.initialize(retreiveHistory());
+ cpHist.initialize(retreiveHistory());
return restoreBinaryState;
}
@@ -2596,52 +2606,81 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Apply update from some iterator and with specific filters.
*
- * @param it WalIterator.
- * @param recPredicate Wal record filter.
- * @param entryPredicate Entry filter.
+ * @param it WAL iterator.
+ * @param stopPred WAL record stop predicate.
+ * @param entryPred Entry filter.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
- IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
- IgnitePredicate<DataEntry> entryPredicate
+ IgniteBiPredicate<WALPointer, WALRecord> stopPred,
+ IgniteBiPredicate<WALRecord, DataEntry> entryPred
) throws IgniteCheckedException {
if (it == null)
return;
cctx.walState().runWithOutWAL(() -> {
- while (it.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+ if (it != null)
+ applyUpdates(it, stopPred, entryPred, false, null, false);
+ });
+ }
- WALRecord rec = next.get2();
+ /**
+ * Applies data entries updates from given WAL iterator if entry satisfies to provided predicates.
+ * @param it WAL iterator.
+ * @param stopPred WAL record predicate for stopping iteration.
+ * @param entryPred Data record and corresponding data entries predicate.
+ * @param lockEntries If true, update will be performed under entry lock.
+ * @param onWalPointerApplied Listener to be invoked after every WAL record applied.
+ * @param asyncApply If true, updates applying will be delegated to the striped executor.
+ */
+ public void applyUpdates(
+ WALIterator it,
+ @Nullable IgniteBiPredicate<WALPointer, WALRecord> stopPred,
+ IgniteBiPredicate<WALRecord, DataEntry> entryPred,
+ boolean lockEntries,
+ IgniteInClosure<WALPointer> onWalPointerApplied,
+ boolean asyncApply
+ ) {
+ StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
- if (!recPredicate.apply(next.get1(), rec))
- break;
+ AtomicReference<IgniteCheckedException> applyError = new AtomicReference<>();
- switch (rec.type()) {
- case MVCC_DATA_RECORD:
- case DATA_RECORD:
+ int[] stripesThrottleAccumulator = new int[exec.stripes()];
+
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+
+ WALRecord rec = next.get2();
+
+ if (stopPred != null && stopPred.apply(next.get1(), rec))
+ break;
+
+ switch (rec.type()) {
+ case MVCC_DATA_RECORD:
+ case DATA_RECORD:
+ if (entryPred.apply(rec, null)) {
checkpointReadLock();
try {
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
- if (entryPredicate.apply(dataEntry)) {
- checkpointReadLock();
-
- try {
- int cacheId = dataEntry.cacheId();
+ if (entryPred.apply(rec, dataEntry)) {
+ int cacheId = dataEntry.cacheId();
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- if (cacheCtx != null)
- applyUpdate(cacheCtx, dataEntry);
- else if (log != null)
- log.warning("Cache is not started. Updates cannot be applied " +
- "[cacheId=" + cacheId + ']');
+ if (cacheCtx != null) {
+ if (asyncApply) {
+ applyUpdateAsync(cacheCtx, dataEntry, lockEntries, exec,
+ applyError, stripesThrottleAccumulator);
+ }
+ else
+ applyUpdate(cacheCtx, dataEntry, lockEntries);
}
- finally {
- checkpointReadUnlock();
+ else if (log != null) {
+ log.warning("Cache is not started. Updates cannot be applied " +
+ "[cacheId=" + cacheId + ']');
}
}
}
@@ -2652,10 +2691,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
finally {
checkpointReadUnlock();
}
+ }
- break;
+ break;
- case MVCC_TX_RECORD:
+ case MVCC_TX_RECORD:
+ if (entryPred.apply(rec, null)) {
checkpointReadLock();
try {
@@ -2668,12 +2709,96 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
finally {
checkpointReadUnlock();
}
+ }
- break;
+ break;
- default:
- // Skip other records.
+ default:
+ // Skip other records.
+ }
+
+ if (onWalPointerApplied != null)
+ onWalPointerApplied.apply(rec.position());
+ }
+
+ if (applyError.get() != null)
+ throw new IgniteException(applyError.get()); // Fail-fast check.
+ else {
+ CountDownLatch stripesClearLatch = new CountDownLatch(exec.stripes());
+
+ // We have to ensure that all asynchronous updates are done.
+ // StripedExecutor guarantees ordering inside stripe - it would enough to await "finishing" tasks.
+ for (int i = 0; i < exec.stripes(); i++)
+ exec.execute(i, stripesClearLatch::countDown);
+
+ try {
+ stripesClearLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ if (applyError.get() != null)
+ throw new IgniteException(applyError.get());
+ }
+
+ /**
+ * Applies update in given striped executor.
+ *
+ * @param cacheCtx Cache context.
+ * @param dataEntry Data entry.
+ * @param lockEntries Lock entries.
+ * @param exec Executor.
+ * @param applyError Apply error reference.
+ * @param stripesThrottleAccumulator Array with accumulated throttle powers for each stripe.
+ */
+ private void applyUpdateAsync(
+ GridCacheContext cacheCtx,
+ DataEntry dataEntry,
+ boolean lockEntries,
+ StripedExecutor exec,
+ AtomicReference<IgniteCheckedException> applyError,
+ int[] stripesThrottleAccumulator
+ ) throws IgniteCheckedException {
+ if (applyError.get() != null)
+ throw applyError.get();
+
+ int stripeIdx = dataEntry.partitionId() % exec.stripes();
+
+ assert stripeIdx >= 0 : "Stripe index should be non-negative: " + stripeIdx;
+
+ if (exec.queueSize(stripeIdx) > THROTTLE_QUEUE_SIZE_THRESHOLD) {
+ int throttlePower = ++stripesThrottleAccumulator[stripeIdx];
+
+ long throttleParkTimeNs = (long)(1000L * Math.pow(1.05, throttlePower));
+
+ if (throttleParkTimeNs > THROTTLE_LOGGING_THRESHOLD) {
+ U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+ + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+ }
+
+ LockSupport.parkNanos(throttleParkTimeNs);
+ }
+ else
+ stripesThrottleAccumulator[stripeIdx] = 0;
+
+ exec.execute(stripeIdx, () -> {
+ try {
+ if (applyError.get() != null)
+ return;
+
+ checkpointReadLock();
+
+ try {
+ applyUpdate(cacheCtx, dataEntry, lockEntries);
}
+ finally {
+ checkpointReadUnlock();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ applyError.compareAndSet(null, e);
}
});
}
@@ -2741,7 +2866,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.kernalContext().query().markAsRebuildNeeded(cacheCtx);
try {
- applyUpdate(cacheCtx, dataEntry);
+ applyUpdate(cacheCtx, dataEntry, false);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to apply data entry, dataEntry=" + dataEntry +
@@ -2859,18 +2984,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param highBound WALPointer.
*/
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
- List<CheckpointEntry> removedFromHistory = cpHistory.onWalTruncated(highBound);
+ List<CheckpointEntry> rmvFromHist = cpHist.onWalTruncated(highBound);
- for (CheckpointEntry cp : removedFromHistory)
+ for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
+ * @param lockEntry If true, update will be performed under entry lock.
* @throws IgniteCheckedException If failed to restore.
*/
- private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
+ private void applyUpdate(
+ GridCacheContext cacheCtx,
+ DataEntry dataEntry,
+ boolean lockEntry
+ ) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
if (partId == -1)
@@ -2878,51 +3008,85 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
GridDhtLocalPartition locPart = cacheCtx.isLocal() ? null : cacheCtx.topology().forceCreatePartition(partId);
+ GridCacheEntryEx entryEx = null;
+
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
- if (dataEntry instanceof MvccDataEntry) {
- cacheCtx.offheap().mvccApplyUpdate(
- cacheCtx,
- dataEntry.key(),
- dataEntry.value(),
- dataEntry.writeVersion(),
- dataEntry.expireTime(),
- locPart,
- ((MvccDataEntry)dataEntry).mvccVer());
+ if (lockEntry) {
+ entryEx = cacheCtx.isNear() ? cacheCtx.near().dht().entryEx(dataEntry.key()) :
+ cacheCtx.cache().entryEx(dataEntry.key());
+
+ entryEx.lockEntry();
}
- else {
- cacheCtx.offheap().update(
- cacheCtx,
- dataEntry.key(),
- dataEntry.value(),
- dataEntry.writeVersion(),
- dataEntry.expireTime(),
- locPart,
- null);
+
+ try {
+ if (dataEntry instanceof MvccDataEntry) {
+ cacheCtx.offheap().mvccApplyUpdate(
+ cacheCtx,
+ dataEntry.key(),
+ dataEntry.value(),
+ dataEntry.writeVersion(),
+ dataEntry.expireTime(),
+ locPart,
+ ((MvccDataEntry)dataEntry).mvccVer());
+ }
+ else {
+ cacheCtx.offheap().update(
+ cacheCtx,
+ dataEntry.key(),
+ dataEntry.value(),
+ dataEntry.writeVersion(),
+ dataEntry.expireTime(),
+ locPart,
+ null);
+ }
+
+ if (dataEntry.partitionCounter() != 0)
+ cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
}
+ finally {
+ if (lockEntry) {
+ entryEx.unlockEntry();
- if (dataEntry.partitionCounter() != 0)
- cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+ entryEx.context().evicts().touch(entryEx);
+ }
+ }
break;
case DELETE:
- if (dataEntry instanceof MvccDataEntry) {
- cacheCtx.offheap().mvccApplyUpdate(
- cacheCtx,
- dataEntry.key(),
- null,
- dataEntry.writeVersion(),
- 0L,
- locPart,
- ((MvccDataEntry)dataEntry).mvccVer());
+ if (lockEntry) {
+ entryEx = cacheCtx.isNear() ? cacheCtx.near().dht().entryEx(dataEntry.key()) :
+ cacheCtx.cache().entryEx(dataEntry.key());
+
+ entryEx.lockEntry();
}
- else
- cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
- if (dataEntry.partitionCounter() != 0)
- cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+ try {
+ if (dataEntry instanceof MvccDataEntry) {
+ cacheCtx.offheap().mvccApplyUpdate(
+ cacheCtx,
+ dataEntry.key(),
+ null,
+ dataEntry.writeVersion(),
+ 0L,
+ locPart,
+ ((MvccDataEntry)dataEntry).mvccVer());
+ }
+ else
+ cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
+
+ if (dataEntry.partitionCounter() != 0)
+ cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
+ }
+ finally {
+ if (lockEntry) {
+ entryEx.unlockEntry();
+
+ entryEx.context().evicts().touch(entryEx);
+ }
+ }
break;
@@ -3227,7 +3391,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
// In this case the actual group states will be readed from WAL by demand.
- if (rec != null && cpHistory.hasSpace())
+ if (rec != null && cpHist.hasSpace())
cacheGrpStates = rec.cacheGroupStates();
return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
@@ -3237,7 +3401,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return Checkpoint history.
*/
@Nullable public CheckpointHistory checkpointHistory() {
- return cpHistory;
+ return cpHist;
}
/**
@@ -4038,7 +4202,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cpRec,
CheckpointEntryType.START);
- cpHistory.addCheckpoint(cp);
+ cpHist.addCheckpoint(cp);
}
}
finally {
@@ -4375,7 +4539,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.wal().notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
}
- List<CheckpointEntry> removedFromHistory = cpHistory.onCheckpointFinished(chp, truncateWalOnCpFinish);
+ List<CheckpointEntry> removedFromHistory = cpHist.onCheckpointFinished(chp, truncateWalOnCpFinish);
for (CheckpointEntry cp : removedFromHistory)
removeCheckpointFiles(cp);
@@ -5166,7 +5330,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public boolean walEnabled(int grpId, boolean local) {
if (local)
- return !initiallyLocalWalDisabledGrps.contains(grpId);
+ return !initiallyLocWalDisabledGrps.contains(grpId);
else
return !initiallyGlobalWalDisabledGrps.contains(grpId);
}
@@ -5216,7 +5380,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
try {
- CheckpointEntry lastCp = cpHistory.lastCheckpoint();
+ CheckpointEntry lastCp = cpHist.lastCheckpoint();
long lastCpTs = lastCp != null ? lastCp.timestamp() : 0;
if (lastCpTs != 0)
@@ -5242,7 +5406,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (t2 != null) {
if (t2.get2())
- initiallyLocalWalDisabledGrps.add(t2.get1());
+ initiallyLocWalDisabledGrps.add(t2.get1());
else
initiallyGlobalWalDisabledGrps.add(t2.get1());
}
@@ -5412,7 +5576,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private IgnitePredicate<Integer> groupsWithEnabledWal() {
return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId)
- && !initiallyLocalWalDisabledGrps.contains(groupId);
+ && !initiallyLocWalDisabledGrps.contains(groupId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index d583a04..ad1bc91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -452,11 +452,11 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
/** */
private void checkRootsPageIdFlag(long treeRoot, long reuseListRoot) throws StorageException {
- if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
+ if (PageIdUtils.flag(treeRoot) != FLAG_DATA)
throw new StorageException("Wrong tree root page id flag: treeRoot="
+ U.hexLong(treeRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
- if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA)
+ if (PageIdUtils.flag(reuseListRoot) != FLAG_DATA)
throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
+ U.hexLong(reuseListRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
}
@@ -515,11 +515,11 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
//MetaStorage never encrypted so realPageSize == pageSize.
io.initNewPage(pageAddr, partMetaId, pageMem.pageSize());
- treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
- reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
+ treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_DATA);
+ reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_DATA);
- assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
- assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+ assert PageIdUtils.flag(treeRoot) == FLAG_DATA;
+ assert PageIdUtils.flag(reuseListRoot) == FLAG_DATA;
io.setTreeRoot(pageAddr, treeRoot);
io.setReuseListRoot(pageAddr, reuseListRoot);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index f1a26be..a86cc25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1017,7 +1017,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+ throw createCorruptedTreeException("Runtime failure on bounds: [lower=%s, upper=%s]", e, lower, upper);
}
finally {
checkDestroyed();
@@ -1182,7 +1182,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on first row lookup", e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on first row lookup", e);
+ throw createCorruptedTreeException("Runtime failure on first row lookup", e);
}
finally {
checkDestroyed();
@@ -1253,7 +1253,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on lookup row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on lookup row: " + row, e);
+ throw createCorruptedTreeException("Runtime failure on lookup row: %s", e, row);
}
finally {
checkDestroyed();
@@ -1813,7 +1813,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+ throw createCorruptedTreeException("Runtime failure on search row: %s", e, row);
}
finally {
x.releaseAll();
@@ -1970,7 +1970,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on search row: " + row, e);
+ throw createCorruptedTreeException("Runtime failure on search row: %s", e, row);
}
finally {
r.releaseAll();
@@ -2322,7 +2322,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
throw new IgniteCheckedException("Runtime failure on row: " + row, e);
}
catch (RuntimeException | AssertionError e) {
- throw new CorruptedTreeException("Runtime failure on row: " + row, e);
+ throw createCorruptedTreeException("Runtime failure on row: %s", e, row);
}
finally {
checkDestroyed();
@@ -5835,4 +5835,22 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
protected IoStatisticsHolder statisticsHolder() {
return IoStatisticsHolderNoOp.INSTANCE;
}
+
+ /**
+ * Creates a new instance of {@link CorruptedTreeException}.
+ *
+ * @param message Detailed error message.
+ * @param cause The cause.
+ * @param rows Optional parameters.
+ * @return New instance of {@link CorruptedTreeException}.
+ */
+ private CorruptedTreeException createCorruptedTreeException(String message, Throwable cause, Object... rows) {
+ try {
+ return new CorruptedTreeException(String.format(message, rows), cause);
+ }
+ catch (Throwable t) {
+ // Failed to create string representation of optional parameters.
+ return new CorruptedTreeException("", cause);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index f744b3f..9af42d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -310,7 +310,7 @@ public abstract class AbstractWalRecordsIterator
* @param desc File descriptor.
* @param start Optional start pointer. Null means read from the beginning.
* @param fileIO fileIO associated with file descriptor
- * @param segmentHeader read segment header from fileIO
+ * @param segmentHdr read segment header from fileIO
* @return Initialized file read header.
* @throws IgniteCheckedException If initialized failed due to another unexpected error.
*/
@@ -318,10 +318,10 @@ public abstract class AbstractWalRecordsIterator
@NotNull final AbstractFileDescriptor desc,
@Nullable final FileWALPointer start,
@NotNull final SegmentIO fileIO,
- @NotNull final SegmentHeader segmentHeader
+ @NotNull final SegmentHeader segmentHdr
) throws IgniteCheckedException {
try {
- boolean isCompacted = segmentHeader.isCompacted();
+ boolean isCompacted = segmentHdr.isCompacted();
if (isCompacted)
serializerFactory.skipPositionCheck(true);
@@ -341,7 +341,7 @@ public abstract class AbstractWalRecordsIterator
}
}
- int serVer = segmentHeader.getSerializerVersion();
+ int serVer = segmentHdr.getSerializerVersion();
return createReadFileHandle(fileIO, serializerFactory.createSerializer(serVer), in);
}
@@ -366,6 +366,9 @@ public abstract class AbstractWalRecordsIterator
throw new IgniteCheckedException(
"Failed to initialize WAL segment after reading segment header: " + desc.file().getAbsolutePath(), e);
}
+ finally {
+ serializerFactory.clearSegmentLocalState();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 094ade7..3aa85ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.sql.Time;
@@ -127,7 +126,6 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
@@ -449,7 +447,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
});
- segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled());
+ segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled(), log);
if (isArchiverEnabled())
archiver = new FileArchiver(segmentAware, log);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 3281d7c..7e01728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -16,6 +16,7 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage;
@@ -40,10 +41,11 @@ public class SegmentAware {
/**
* @param walSegmentsCnt Total WAL segments count.
* @param compactionEnabled Is wal compaction enabled.
+ * @param log Logger.
*/
- public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) {
+ public SegmentAware(int walSegmentsCnt, boolean compactionEnabled, IgniteLogger log) {
segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
- segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled);
+ segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled, log);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 458f119..c565bf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -16,11 +16,12 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
/**
* Storage of actual information about current index of compressed segments.
@@ -53,25 +54,37 @@ public class SegmentCompressStorage {
/** Min uncompressed index to keep. */
private volatile long minUncompressedIdxToKeep = -1L;
+ /** Logger. */
+ private final IgniteLogger log;
+
/**
* @param segmentArchivedStorage Storage of last archived segment.
* @param compactionEnabled If WAL compaction enabled.
+ * @param log Logger.
*/
- private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage, boolean compactionEnabled) {
+ private SegmentCompressStorage(
+ SegmentArchivedStorage segmentArchivedStorage,
+ boolean compactionEnabled,
+ IgniteLogger log) {
this.segmentArchivedStorage = segmentArchivedStorage;
this.compactionEnabled = compactionEnabled;
this.segmentArchivedStorage.addObserver(this::onSegmentArchived);
+
+ this.log = log;
}
/**
* @param segmentArchivedStorage Storage of last archived segment.
* @param compactionEnabled If WAL compaction enabled.
+ * @param log Logger.
*/
- static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage,
- boolean compactionEnabled) {
- SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled);
+ static SegmentCompressStorage buildCompressStorage(
+ SegmentArchivedStorage segmentArchivedStorage,
+ boolean compactionEnabled,
+ IgniteLogger log) {
+ SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled, log);
segmentArchivedStorage.addObserver(storage::onSegmentArchived);
@@ -79,11 +92,22 @@ public class SegmentCompressStorage {
}
/**
+ * Sets the largest index of previously compressed segment.
+ *
+ * @param idx Segment index.
+ */
+ synchronized void lastSegmentCompressed(long idx) {
+ onSegmentCompressed(lastEnqueuedToCompressIdx = idx);
+ }
+
+ /**
* Callback after segment compression finish.
*
* @param compressedIdx Index of compressed segment.
*/
synchronized void onSegmentCompressed(long compressedIdx) {
+ log.info("Segment compressed notification [idx=" + compressedIdx + ']');
+
if (compressedIdx > lastMaxCompressedIdx)
lastMaxCompressedIdx = compressedIdx;
@@ -150,8 +174,11 @@ public class SegmentCompressStorage {
* Callback for waking up compressor when new segment is archived.
*/
private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
- while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled)
+ while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) {
+ log.info("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');
+
segmentsToCompress.add(++lastEnqueuedToCompressIdx);
+ }
notifyAll();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index e3f1499..9e4305a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -22,13 +22,16 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteOrder;
import java.nio.file.FileVisitResult;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.TreeSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -265,20 +268,24 @@ public class IgniteWalIteratorFactory {
if (filesOrDirs == null || filesOrDirs.length == 0)
return Collections.emptyList();
- final FileIOFactory ioFactory = iteratorParametersBuilder.ioFactory;
-
final TreeSet<FileDescriptor> descriptors = new TreeSet<>();
for (File file : filesOrDirs) {
if (file.isDirectory()) {
try {
walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
- addFileDescriptor(path.toFile(), ioFactory, descriptors);
+ @Override public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
+ addFileDescriptor(path.toFile(), descriptors, iteratorParametersBuilder);
return FileVisitResult.CONTINUE;
}
+
+ @Override public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ if (exc instanceof NoSuchFileException)
+ return FileVisitResult.CONTINUE;
+
+ return super.visitFileFailed(file, exc);
+ }
});
}
catch (IOException e) {
@@ -288,31 +295,42 @@ public class IgniteWalIteratorFactory {
continue;
}
- addFileDescriptor(file, ioFactory, descriptors);
+ addFileDescriptor(file, descriptors, iteratorParametersBuilder);
}
return new ArrayList<>(descriptors);
}
/**
+ * @param file File
+ * @param descriptors List of descriptors
+ * @param params IteratorParametersBuilder.
+ */
+ private void addFileDescriptor(
+ File file,
+ Collection<FileDescriptor> descriptors,
+ IteratorParametersBuilder params)
+ {
+ Optional.ofNullable(getFileDescriptor(file, params.ioFactory))
+ .filter(desc -> desc.idx() >= params.lowBound.index() && desc.idx() <= params.highBound.index())
+ .ifPresent(descriptors::add);
+ }
+
+ /**
* @param file File.
* @param ioFactory IO factory.
- * @param descriptors List of descriptors.
*/
- private void addFileDescriptor(File file, FileIOFactory ioFactory, TreeSet<FileDescriptor> descriptors) {
+ private FileDescriptor getFileDescriptor(File file, FileIOFactory ioFactory) {
if (file.length() < HEADER_RECORD_SIZE)
- return; // Filter out this segment as it is too short.
+ return null; // Filter out this segment as it is too short.
String fileName = file.getName();
if (!WAL_NAME_PATTERN.matcher(fileName).matches() &&
!WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(fileName).matches())
- return; // Filter out this because it is not segment file.
-
- FileDescriptor desc = readFileDescriptor(file, ioFactory);
+ return null; // Filter out this because it is not segment file.
- if (desc != null)
- descriptors.add(desc);
+ return readFileDescriptor(file, ioFactory);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index f14bcac..293b961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.processors.txdr.TransactionalDrProcessor;
import org.apache.ignite.internal.stat.IoStatisticsManager;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
@@ -463,6 +464,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
+ @Override public TransactionalDrProcessor txDr() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public GridLoadBalancerManager loadBalancing() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 4044f3a..5bd5a21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -283,9 +283,14 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
if (tup == null)
return tup;
- if (!checkBounds(tup.get1())) {
+ if (tup.get2() instanceof FilteredRecord)
+ return new T2<>(tup.get1(), FilteredRecord.INSTANCE);
+
+ WALPointer originalPtr = tup.get2().position();
+
+ if (!checkBounds(originalPtr)) {
if (curRec != null) {
- FileWALPointer prevRecPtr = (FileWALPointer)curRec.get1();
+ FileWALPointer prevRecPtr = (FileWALPointer)curRec.get2().position();
// Fast stop condition, after high bound reached.
if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
@@ -471,7 +476,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
* @param keepBinary Don't convert non primitive types.
* @return Unwrapped entry.
*/
- private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
+ private @NotNull DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
KeyCacheObject key, CacheObject val, boolean keepBinary) {
if (dataEntry instanceof MvccDataEntry)
return new UnwrapMvccDataEntry(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 5e84253..f8285d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.ConsistentCutRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
@@ -108,6 +109,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
case MVCC_TX_RECORD:
return txRecordSerializer.size((MvccTxRecord)rec);
+ case CONSISTENT_CUT:
+ return 0;
+
default:
return super.plainSize(rec);
}
@@ -192,6 +196,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
case MVCC_TX_RECORD:
return txRecordSerializer.readMvccTx(in);
+ case CONSISTENT_CUT:
+ return new ConsistentCutRecord();
+
default:
return super.readPlainRecord(type, in, encrypted);
}
@@ -274,6 +281,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
break;
+ case CONSISTENT_CUT:
+ break;
+
default:
super.writePlainRecord(rec, buf);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
index 923949c..6921b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactory.java
@@ -69,4 +69,9 @@ public interface RecordSerializerFactory {
* @param skipPositionCheck Skip position check.
*/
public RecordSerializerFactory skipPositionCheck(boolean skipPositionCheck);
+
+ /**
+ * Clears factory parameters that are actual only for specific WAL segment.
+ */
+ public RecordSerializerFactory clearSegmentLocalState();
}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
index bf9c3cd..4a93d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -148,4 +148,12 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory {
return this;
}
+
+ /** {@inheritDoc} */
+ @Override public RecordSerializerFactory clearSegmentLocalState() {
+ skipPositionCheck = false;
+ recordDeserializeFilter = null;
+
+ return this;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ed8923b..9c89cfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -835,6 +835,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (grp.persistenceEnabled() && grp.walEnabled() &&
cctx.snapshot().needTxReadLogging()) {
+ cctx.tm().pendingTxsTracker().onKeysRead(
+ nearXidVersion(), Collections.singletonList(txEntry.key()));
+
ptr = cctx.wal().log(new DataRecord(new DataEntry(
cacheCtx.cacheId(),
txEntry.key(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 12ea9d9..ed8f010 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -232,7 +232,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private TxDeadlockDetection txDeadlockDetection;
/** Flag indicates that {@link TxRecord} records will be logged to WAL. */
- private boolean logTxRecords;
+ private volatile boolean logTxRecords;
+
+ /** Pending transactions tracker. */
+ private LocalPendingTransactionsTracker pendingTracker;
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
@@ -307,6 +310,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
+ this.pendingTracker = new LocalPendingTransactionsTracker(cctx);
+
+ // todo gg-13416 unhardcode
this.logTxRecords = IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false);
}
@@ -2499,6 +2505,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ *
+ */
+ public LocalPendingTransactionsTracker pendingTxsTracker() {
+ return pendingTracker;
+ }
+
+ /**
+ * Enables pending transactions tracker.
+ * Also enables transaction wal logging, if it was disabled.
+ */
+ public void trackPendingTxs() {
+ pendingTracker.enable();
+
+ if (!logTxRecords) {
+ logTxRecords = true;
+
+ U.warn(log, "Transaction wal logging is enabled, because pending transaction tracker is enabled.");
+ }
+ }
+
+ /**
* Sets MVCC state.
*
* @param tx Transaction.
@@ -2555,7 +2582,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
record = new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
try {
- return cctx.wal().log(record);
+ WALPointer ptr = cctx.wal().log(record);
+
+ TransactionState txState = tx.state();
+
+ if (txState == PREPARED)
+ cctx.tm().pendingTxsTracker().onTxPrepared(tx.nearXidVersion());
+ else if (txState == ROLLED_BACK)
+ cctx.tm().pendingTxsTracker().onTxRolledBack(tx.nearXidVersion());
+ else
+ cctx.tm().pendingTxsTracker().onTxCommitted(tx.nearXidVersion());
+
+ return ptr;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to log TxRecord: " + record, e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java
new file mode 100644
index 0000000..3aa179e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTracker.java
@@ -0,0 +1,590 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.GridKernalState;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED;
+
+/**
+ * Tracks pending transactions for purposes of consistent cut algorithm.
+ */
+public class LocalPendingTransactionsTracker {
+ /** Cctx. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Currently committing transactions. */
+ private final Set<GridCacheVersion> currentlyCommittingTxs = U.newConcurrentHashSet();
+
+ /** Tracker enabled. */
+ private volatile boolean enabled = IgniteSystemProperties.getBoolean(IGNITE_PENDING_TX_TRACKER_ENABLED, false);
+
+ /** Currently prepared transactions. Counters are incremented on prepare, decremented on commit/rollback. */
+ private final ConcurrentHashMap<GridCacheVersion, Integer> preparedCommittedTxsCounters = new ConcurrentHashMap<>();
+
+ /**
+ * Transactions that were transitioned to pending state since last {@link #startTrackingPrepared()} call.
+ * Transaction remains in this map after commit/rollback.
+ */
+ private volatile GridConcurrentHashSet<GridCacheVersion> trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+ /** Transactions that were transitioned to committed state since last {@link #startTrackingCommitted()} call. */
+ private volatile GridConcurrentHashSet<GridCacheVersion> trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+ /** Written keys to near xid version. */
+ private volatile ConcurrentHashMap<KeyCacheObject, Set<GridCacheVersion>> writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+ /** Graph of dependent (by keys) transactions. */
+ private volatile ConcurrentHashMap<GridCacheVersion, Set<GridCacheVersion>> dependentTransactionsGraph = new ConcurrentHashMap<>();
+ // todo GG-13416: maybe handle local sequential consistency with threadId
+
+ /** State rw-lock. */
+ private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
+
+ /** Track prepared flag. */
+ private final AtomicBoolean trackPrepared = new AtomicBoolean(false);
+
+ /** Track committed flag. */
+ private final AtomicBoolean trackCommitted = new AtomicBoolean(false);
+
+ /** Tx finish awaiting. */
+ private volatile TxFinishAwaiting txFinishAwaiting;
+
+ /**
+ * Tx finish awaiting facility.
+ */
+ private class TxFinishAwaiting {
+ /** Future. */
+ private final GridFutureAdapter<Set<GridCacheVersion>> fut;
+
+ /** Not committed in timeout txs. */
+ private final Set<GridCacheVersion> notCommittedInTimeoutTxs;
+
+ /** Committing txs. */
+ private final Set<GridCacheVersion> committingTxs;
+
+ /** Global committing txs added. */
+ private volatile boolean globalCommittingTxsAdded;
+
+ /** Awaiting prepared is done. */
+ private volatile boolean awaitingPreparedIsDone;
+
+ /** Timeout. */
+ private volatile boolean timeout;
+
+ /**
+ * @param preparedTxsTimeout Prepared txs timeout.
+ * @param committingTxsTimeout Committing txs timeout.
+ */
+ private TxFinishAwaiting(final long preparedTxsTimeout, final long committingTxsTimeout) {
+ assert preparedTxsTimeout > 0 : preparedTxsTimeout;
+ assert committingTxsTimeout > 0 : committingTxsTimeout;
+ assert committingTxsTimeout >= preparedTxsTimeout : committingTxsTimeout + " < " + preparedTxsTimeout;
+
+ fut = new GridFutureAdapter<>();
+
+ notCommittedInTimeoutTxs = new GridConcurrentHashSet<>(preparedCommittedTxsCounters.keySet());
+
+ committingTxs = U.newConcurrentHashSet(currentlyCommittingTxs);
+
+ if (committingTxsTimeout > preparedTxsTimeout) {
+ cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(preparedTxsTimeout) {
+ @Override public void onTimeout() {
+ awaitingPreparedIsDone = true;
+
+ if (TxFinishAwaiting.this != txFinishAwaiting || fut.isDone())
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ if (allCommittingIsFinished())
+ finish();
+ else
+ log.warning("Committing transactions not completed in " + preparedTxsTimeout + " ms: "
+ + committingTxs);
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+ });
+ }
+
+ cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(committingTxsTimeout) {
+ @Override public void onTimeout() {
+ timeout = true;
+
+ if (committingTxsTimeout == preparedTxsTimeout)
+ awaitingPreparedIsDone = true;
+
+ if (TxFinishAwaiting.this != txFinishAwaiting || fut.isDone())
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ if (!allCommittingIsFinished())
+ log.warning("Committing transactions not completed in " + committingTxsTimeout + " ms: "
+ + committingTxs);
+
+ finish();
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+ });
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ */
+ void onTxFinished(GridCacheVersion nearXidVer) {
+ notCommittedInTimeoutTxs.remove(nearXidVer);
+
+ checkTxsFinished();
+ }
+
+ /**
+ *
+ */
+ void checkTxsFinished() {
+ if (notCommittedInTimeoutTxs.isEmpty() || awaitingPreparedIsDone && allCommittingIsFinished())
+ finish();
+ }
+
+ /**
+ *
+ */
+ void finish() {
+ if (globalCommittingTxsAdded || timeout) {
+ txFinishAwaiting = null;
+
+ fut.onDone(notCommittedInTimeoutTxs.isEmpty() ?
+ Collections.emptySet() :
+ U.sealSet(notCommittedInTimeoutTxs));
+ }
+ }
+
+ /**
+ * @return {@code true} if the set of committing transactions {@code committingTxs} is empty.
+ */
+ boolean allCommittingIsFinished() {
+ committingTxs.retainAll(notCommittedInTimeoutTxs);
+
+ return committingTxs.isEmpty();
+ }
+
+ /**
+ * @param globalCommittingTxs Global committing txs.
+ */
+ void addGlobalCommittingTxs(Set<GridCacheVersion> globalCommittingTxs) {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+
+ notCommittedInTimeoutTxs.addAll(preparedCommittedTxsCounters.keySet());
+
+ Set<GridCacheVersion> pendingTxs = new HashSet<>(notCommittedInTimeoutTxs);
+
+ pendingTxs.retainAll(globalCommittingTxs);
+
+ committingTxs.addAll(pendingTxs);
+
+ globalCommittingTxsAdded = true;
+
+ assert !fut.isDone() || timeout;
+
+ checkTxsFinished();
+ }
+ }
+
+ /**
+ * @param cctx Cctx.
+ */
+ public LocalPendingTransactionsTracker(GridCacheSharedContext<?, ?> cctx) {
+ this.cctx = cctx;
+
+ log = cctx.logger(getClass());
+ }
+
+ /**
+ * Enable pending transactions tracking.
+ */
+ void enable() {
+ assert cctx.kernalContext().gateway().getState() == GridKernalState.STARTING;
+
+ enabled = true;
+ }
+
+ /**
+ * @return whether this tracker is enabled or not.
+ */
+ public boolean enabled() {
+ return enabled;
+ }
+
+ /**
+ * Returns a collection of transactions {@code P2} that are prepared but yet not committed
+ * between phase {@code Cut1} and phase {@code Cut2}.
+ *
+ * @return Collection of prepared transactions.
+ */
+ public Set<GridCacheVersion> currentlyPreparedTxs() {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+
+ return U.sealSet(preparedCommittedTxsCounters.keySet());
+ }
+
+ /**
+ * Starts tracking transactions that will form a set of transactions {@code P23}
+ * that were prepared since phase {@code Cut2} to phase {@code Cut3}.
+ */
+ public void startTrackingPrepared() {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+ assert !trackPrepared.get(): "Tracking prepared transactions is already initialized.";
+
+ trackPrepared.set(true);
+ }
+
+ /**
+ * @return nearXidVer -> prepared WAL ptr
+ */
+ public Set<GridCacheVersion> stopTrackingPrepared() {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+ assert trackPrepared.get(): "Tracking prepared transactions is not initialized yet.";
+
+ trackPrepared.set(false);
+
+ Set<GridCacheVersion> res = U.sealSet(trackedPreparedTxs);
+
+ trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+ return res;
+ }
+
+ /**
+ * Starts tracking committed transactions {@code C12} between phase {@code Cut1} and phase {@code Cut2}.
+ */
+ public void startTrackingCommitted() {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+ assert !trackCommitted.get() : "Tracking committed transactions is already initialized.";
+
+ trackCommitted.set(true);
+ }
+
+ /**
+ * @return nearXidVer -> prepared WAL ptr
+ */
+ public TrackCommittedResult stopTrackingCommitted() {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+ assert trackCommitted.get() : "Tracking committed transactions is not initialized yet.";
+
+ trackCommitted.set(false);
+
+ Set<GridCacheVersion> committedTxs = U.sealSet(trackedCommittedTxs);
+
+ Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxs = U.sealMap(dependentTransactionsGraph);
+
+ trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+ writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+ dependentTransactionsGraph = new ConcurrentHashMap<>();
+
+ return new TrackCommittedResult(committedTxs, dependentTxs);
+ }
+
+ /**
+ * @param preparedTxsTimeout Timeout in milliseconds for awaiting of prepared transactions.
+ * @param committingTxsTimeout Timeout in milliseconds for awaiting of committing transactions.
+ * @return Collection of local transactions in committing state.
+ */
+ public Set<GridCacheVersion> startTxFinishAwaiting(
+ long preparedTxsTimeout, long committingTxsTimeout) {
+
+ assert stateLock.writeLock().isHeldByCurrentThread();
+
+ assert txFinishAwaiting == null : txFinishAwaiting;
+
+ TxFinishAwaiting awaiting = new TxFinishAwaiting(preparedTxsTimeout, committingTxsTimeout);
+
+ txFinishAwaiting = awaiting;
+
+ return awaiting.committingTxs;
+ }
+
+ /**
+ * @param globalCommittingTxs Global committing transactions.
+ * @return Future with collection of transactions that failed to finish within timeout.
+ */
+ public IgniteInternalFuture<Set<GridCacheVersion>> awaitPendingTxsFinished(
+ Set<GridCacheVersion> globalCommittingTxs
+ ) {
+ assert stateLock.writeLock().isHeldByCurrentThread();
+
+ TxFinishAwaiting awaiting = txFinishAwaiting;
+
+ assert awaiting != null;
+
+ awaiting.addGlobalCommittingTxs(globalCommittingTxs);
+
+ return awaiting.fut;
+ }
+
+ /**
+ * Freezes state of all tracker collections. Any active transactions that modify collections will
+ * wait on readLock().
+ * Can be used to obtain consistent snapshot of several collections.
+ */
+ public void writeLockState() {
+ stateLock.writeLock().lock();
+ }
+
+ /**
+ * Unfreezes state of all tracker collections, releases waiting transactions.
+ */
+ public void writeUnlockState() {
+ stateLock.writeLock().unlock();
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ */
+ public void onTxPrepared(GridCacheVersion nearXidVer) {
+ if (!enabled)
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> value == null ? 1 : value + 1);
+
+ if (trackPrepared.get())
+ trackedPreparedTxs.add(nearXidVer);
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ */
+ public void onTxCommitted(GridCacheVersion nearXidVer) {
+ if (!enabled)
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ Integer newCntr = preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
+ if (value == null || value <= 0) {
+ throw new AssertionError("Committing transaction that was rolled back or concurrently committed " +
+ "[nearXidVer=" + nearXidVer + ", currentCntr=" + value + ']');
+ }
+
+ if (value == 1)
+ return null;
+
+ return value - 1;
+ });
+
+ if (newCntr == null) {
+ currentlyCommittingTxs.remove(nearXidVer);
+
+ if (trackCommitted.get())
+ trackedCommittedTxs.add(nearXidVer);
+
+ checkTxFinishFutureDone(nearXidVer);
+ }
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ */
+ public void onTxRolledBack(GridCacheVersion nearXidVer) {
+ if (!enabled)
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ Integer newCntr = preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
+ if (value == null || value <= 1)
+ return null;
+
+ return value - 1;
+ });
+
+ if (newCntr == null) {
+ currentlyCommittingTxs.remove(nearXidVer);
+
+ checkTxFinishFutureDone(nearXidVer);
+ }
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ * @param keys Keys.
+ */
+ public void onKeysWritten(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
+ if (!enabled)
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ if (!preparedCommittedTxsCounters.containsKey(nearXidVer))
+ throw new AssertionError("Tx should be in PREPARED state when logging data records: " + nearXidVer);
+
+ currentlyCommittingTxs.add(nearXidVer);
+
+ if (!trackCommitted.get())
+ return;
+
+ for (KeyCacheObject key : keys) {
+ writtenKeysToNearXidVer.compute(key, (keyObj, keyTxsSet) -> {
+ Set<GridCacheVersion> keyTxs = keyTxsSet == null ? new HashSet<>() : keyTxsSet;
+
+ for (GridCacheVersion previousTx : keyTxs) {
+ dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
+ Set<GridCacheVersion> dependentTxs = depTxsSet == null ? new HashSet<>() : depTxsSet;
+
+ dependentTxs.add(nearXidVer);
+
+ return dependentTxs;
+ });
+ }
+
+ keyTxs.add(nearXidVer);
+
+ return keyTxs;
+ });
+ }
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ * @param keys Keys.
+ */
+ public void onKeysRead(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
+ if (!enabled)
+ return;
+
+ stateLock.readLock().lock();
+
+ try {
+ if (!preparedCommittedTxsCounters.containsKey(nearXidVer))
+ throw new AssertionError("Tx should be in PREPARED state when logging data records: " + nearXidVer);
+
+ currentlyCommittingTxs.add(nearXidVer);
+
+ if (!trackCommitted.get())
+ return;
+
+ for (KeyCacheObject key : keys) {
+ writtenKeysToNearXidVer.computeIfPresent(key, (keyObj, keyTxsSet) -> {
+ for (GridCacheVersion previousTx : keyTxsSet) {
+ dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
+ Set<GridCacheVersion> dependentTxs = depTxsSet == null ? new HashSet<>() : depTxsSet;
+
+ dependentTxs.add(nearXidVer);
+
+ return dependentTxs;
+ });
+ }
+
+ return keyTxsSet;
+ });
+ }
+ }
+ finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Resets the state of this tracker.
+ */
+ public void reset() {
+ stateLock.writeLock().lock();
+
+ try {
+ txFinishAwaiting = null;
+
+ trackCommitted.set(false);
+
+ trackedCommittedTxs = new GridConcurrentHashSet<>();
+
+ trackPrepared.set(false);
+
+ trackedPreparedTxs = new GridConcurrentHashSet<>();
+
+ writtenKeysToNearXidVer = new ConcurrentHashMap<>();
+
+ dependentTransactionsGraph = new ConcurrentHashMap<>();
+ }
+ finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param nearXidVer Near xid version.
+ */
+ private void checkTxFinishFutureDone(GridCacheVersion nearXidVer) {
+ if (!enabled)
+ return;
+
+ TxFinishAwaiting awaiting = txFinishAwaiting;
+
+ if (awaiting != null)
+ awaiting.onTxFinished(nearXidVer);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java
new file mode 100644
index 0000000..2ac5097
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TrackCommittedResult.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Tuple for result of {@link LocalPendingTransactionsTracker#stopTrackingCommitted()}.
+ */
+public class TrackCommittedResult {
+ /** Transactions committed during tracked period. */
+ private final Set<GridCacheVersion> committedTxs;
+
+ /** Graph of dependent (by keys) transactions. */
+ private final Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph;
+
+ /**
+ * @param committedTxs Commited txs.
+ * @param dependentTxsGraph Dependent txs graph.
+ */
+ public TrackCommittedResult(
+ Set<GridCacheVersion> committedTxs,
+ Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph
+ ) {
+ this.committedTxs = committedTxs;
+ this.dependentTxsGraph = dependentTxsGraph;
+ }
+
+ /**
+ *
+ */
+ public Set<GridCacheVersion> committedTxs() {
+ return committedTxs;
+ }
+
+ /**
+ *
+ */
+ public Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph() {
+ return dependentTxsGraph;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 40b20ee..03e6462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
@@ -61,8 +62,12 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** Data center ID. */
private byte dataCenterId;
- /** */
- private long gridStartTime;
+ /**
+ * Oldest cluster node start timestamp, lazily initialized.
+ *
+ * @see DiscoverySpi#getGridStartTime().
+ */
+ private volatile long gridStartTime;
/** */
private GridCacheVersion ISOLATED_STREAMER_VER;
@@ -324,4 +329,11 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
public boolean isStartVersion(GridCacheVersion ver) {
return startVer.equals(ver);
}
+
+ /**
+ * Invalidates first cluster node start timestamp, it can be reinitialized lazily in the future.
+ */
+ public void invalidateGridStartTime() {
+ gridStartTime = 0;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
index 7f2b4b0..6a02e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopologyHistoryItem.java
@@ -19,6 +19,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -75,4 +76,23 @@ public class BaselineTopologyHistoryItem implements Serializable {
public List<Long> branchingHistory() {
return branchingHistory;
}
+
+ /**
+ * Returns {@code true} if baseline topology history item contains node with given consistent ID.
+ *
+ * @param consistentId Consistent ID.
+ * @return {@code True} if baseline topology history item contains node with given consistent ID.
+ */
+ public boolean containsNode(Object consistentId) {
+ return consIds.contains(consistentId);
+ }
+
+ /**
+ * Returns a copy of consistent ids of nodes that included into this baseline topology item.
+ *
+ * @return Collection of consistent ids.
+ */
+ public Collection<Object> consistentIds() {
+ return U.arrayList(this.consIds);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index d19585d..d41ff14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -197,6 +197,14 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
}
/**
+ * Sets timestamp.
+ * @param timestamp Timestamp.
+ */
+ public void timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ /**
* @return State change request ID.
*/
public UUID requestId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index c958e17..6357791 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1,12 +1,12 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
+ *
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
@@ -335,6 +336,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
/**
+ * Returns baseline topology history.
+ */
+ public BaselineTopologyHistory baselineHistory() {
+ return bltHist;
+ }
+
+ /**
* @param blt Blt.
*/
private void writeBaselineTopology(BaselineTopology blt, BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException {
@@ -443,20 +451,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
/**
* Checks whether all conditions to meet BaselineTopology are satisfied.
*/
- private boolean isBaselineSatisfied(BaselineTopology blt, List<ClusterNode> serverNodes) {
+ private boolean isBaselineSatisfied(BaselineTopology blt, List<ClusterNode> srvNodes) {
if (blt == null)
return false;
if (blt.consistentIds() == null)
return false;
- if (//only node participating in BaselineTopology is allowed to send activation command...
- blt.consistentIds().contains(ctx.discovery().localNode().consistentId())
- //...and with this node BaselineTopology is reached
- && blt.isSatisfied(serverNodes))
- return true;
-
- return false;
+ // Only node participating in BaselineTopology is allowed to send activation command
+ // and with this node BaselineTopology is reached.
+ return blt.consistentIds().contains(ctx.discovery().localNode().consistentId()) && blt.isSatisfied(srvNodes);
}
/** {@inheritDoc} */
@@ -523,10 +527,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
) {
DiscoveryDataClusterState state = globalState;
- if (log.isInfoEnabled())
- U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" +
- (msg.baselineTopology() == null ? ": null"
- : "[id=" + msg.baselineTopology().id() + "]"));
+ if (log.isInfoEnabled()) {
+ String baseline = msg.baselineTopology() == null ? ": null" : "[id=" + msg.baselineTopology().id() + ']';
+
+ U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" + baseline);
+ }
if (msg.baselineTopology() != null)
compatibilityMode = false;
@@ -592,7 +597,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
BaselineTopologyHistoryItem bltHistItem = BaselineTopologyHistoryItem.fromBaseline(
globalState.baselineTopology());
- transitionFuts.put(msg.requestId(), new GridFutureAdapter<Void>());
+ transitionFuts.put(msg.requestId(), new GridFutureAdapter<>());
DiscoveryDataClusterState prevState = globalState;
@@ -679,7 +684,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
if (initiatorNode.equals(ctx.localNodeId())) {
GridChangeGlobalStateFuture fut = stateChangeFut.get();
- if (fut != null && fut.requestId.equals(reqId))
+ if (fut != null && fut.reqId.equals(reqId))
return fut;
}
@@ -774,19 +779,19 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
return;
}
- BaselineTopologyHistory historyToSend = null;
+ BaselineTopologyHistory histToSnd = null;
if (!bltHist.isEmpty()) {
if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
int lastId = joiningNodeState.baselineTopology().id();
- historyToSend = bltHist.tailFrom(lastId);
+ histToSnd = bltHist.tailFrom(lastId);
}
else
- historyToSend = bltHist;
+ histToSnd = bltHist;
}
- dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, historyToSend));
+ dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, histToSnd));
}
/** {@inheritDoc} */
@@ -811,7 +816,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
DiscoveryDataClusterState state = stateDiscoData.globalState;
if (state.transition())
- transitionFuts.put(state.transitionRequestId(), new GridFutureAdapter<Void>());
+ transitionFuts.put(state.transitionRequestId(), new GridFutureAdapter<>());
globalState = state;
@@ -826,9 +831,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
@Override public IgniteInternalFuture<?> changeGlobalState(
final boolean activate,
Collection<? extends BaselineNode> baselineNodes,
- boolean forceChangeBaselineTopology
+ boolean forceChangeBaselineTop
) {
- return changeGlobalState(activate, baselineNodes, forceChangeBaselineTopology, false);
+ return changeGlobalState(activate, baselineNodes, forceChangeBaselineTop, false);
}
public IgniteInternalFuture<?> changeGlobalState(
@@ -848,15 +853,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
*/
private BaselineTopology calculateNewBaselineTopology(final boolean activate,
Collection<? extends BaselineNode> baselineNodes,
- boolean forceChangeBaselineTopology) {
+ boolean forceChangeBaselineTop
+ ) {
BaselineTopology newBlt;
- BaselineTopology currentBlt = globalState.baselineTopology();
+ BaselineTopology currBlt = globalState.baselineTopology();
int newBltId = 0;
- if (currentBlt != null)
- newBltId = activate ? currentBlt.id() + 1 : currentBlt.id();
+ if (currBlt != null)
+ newBltId = activate ? currBlt.id() + 1 : currBlt.id();
if (baselineNodes != null && !baselineNodes.isEmpty()) {
List<BaselineNode> baselineNodes0 = new ArrayList<>();
@@ -875,16 +881,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
baselineNodes = baselineNodes0;
}
- if (forceChangeBaselineTopology)
+ if (forceChangeBaselineTop)
newBlt = BaselineTopology.build(baselineNodes, newBltId);
else if (activate) {
if (baselineNodes == null)
baselineNodes = baselineNodes();
- if (currentBlt == null)
+ if (currBlt == null)
newBlt = BaselineTopology.build(baselineNodes, newBltId);
else {
- newBlt = currentBlt;
+ newBlt = currBlt;
newBlt.updateHistory(baselineNodes);
}
@@ -899,32 +905,38 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
private Collection<BaselineNode> baselineNodes() {
List<ClusterNode> clNodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
- ArrayList<BaselineNode> bltNodes = new ArrayList<>(clNodes.size());
+ List<BaselineNode> bltNodes = new ArrayList<>(clNodes.size());
- for (ClusterNode clNode : clNodes)
- bltNodes.add(clNode);
+ bltNodes.addAll(clNodes);
return bltNodes;
}
/** */
- private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
- BaselineTopology blt, boolean forceChangeBaselineTopology) {
- return changeGlobalState0(activate, blt, forceChangeBaselineTopology, false);
+ private IgniteInternalFuture<?> changeGlobalState0(
+ final boolean activate,
+ BaselineTopology blt,
+ boolean forceChangeBaselineTop
+ ) {
+ return changeGlobalState0(activate, blt, forceChangeBaselineTop, false);
}
/** */
- private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
- BaselineTopology blt, boolean forceChangeBaselineTopology, boolean isAutoAdjust) {
+ private IgniteInternalFuture<?> changeGlobalState0(
+ final boolean activate,
+ BaselineTopology blt,
+ boolean forceChangeBaselineTop,
+ boolean isAutoAdjust
+ ) {
boolean isBaselineAutoAdjustEnabled = isBaselineAutoAdjustEnabled();
- if (forceChangeBaselineTopology && isBaselineAutoAdjustEnabled != isAutoAdjust)
+ if (forceChangeBaselineTop && isBaselineAutoAdjustEnabled != isAutoAdjust)
throw new BaselineAdjustForbiddenException(isBaselineAutoAdjustEnabled);
if (ctx.isDaemon() || ctx.clientNode()) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
- sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTopology, fut);
+ sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTop, fut);
return fut;
}
@@ -995,15 +1007,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
}
- ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
+ ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.reqId,
ctx.localNodeId(),
storedCfgs,
activate,
blt,
- forceChangeBaselineTopology,
+ forceChangeBaselineTop,
System.currentTimeMillis()
);
+ ctx.txDr().onChangeGlobalStateMessagePrepared(msg);
+
IgniteInternalFuture<?> resFut = wrapStateChangeFuture(startedFut, msg);
try {
@@ -1059,16 +1073,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
return null;
if (globalState == null || globalState.baselineTopology() == null) {
- if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
- String msg = "Node with set up BaselineTopology is not allowed to join cluster without one: " + node.consistentId();
+ if (joiningNodeState.baselineTopology() != null) {
+ String msg = "Node with set up BaselineTopology is not allowed to join cluster without one: " +
+ node.consistentId();
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
}
if (globalState.transition() && globalState.previousBaselineTopology() == null) {
- //case when cluster is activating for the first time and other node with existing baseline topology
- //tries to join
+ // Case when cluster is activating for the first time and other node with existing baseline topology
+ // tries to join.
String msg = "Node with set up BaselineTopology is not allowed " +
"to join cluster in the process of first activation: " + node.consistentId();
@@ -1193,12 +1208,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
", daemon" + ctx.isDaemon() + "]");
}
- ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
+ ClusterGroupAdapter clusterGrpAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
- if (F.isEmpty(clusterGroupAdapter.nodes()))
+ if (F.isEmpty(clusterGrpAdapter.nodes()))
return new IgniteFinishedFutureImpl<>(false);
- IgniteCompute comp = clusterGroupAdapter.compute();
+ IgniteCompute comp = clusterGrpAdapter.compute();
return comp.callAsync(new IgniteCallable<Boolean>() {
@IgniteInstanceResource
@@ -1283,6 +1298,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
distributedBaselineConfiguration.onActivate();
+ ctx.txDr().onActivate(ctx);
+
if (log.isInfoEnabled())
log.info("Successfully performed final activation steps [nodeId="
+ ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
@@ -1333,6 +1350,92 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
/**
+ * Executes validation checks of cluster state and BaselineTopology before changing BaselineTopology to new one.
+ */
+ public void validateBeforeBaselineChange(Collection<? extends BaselineNode> baselineTop) {
+ verifyBaselineTopologySupport(ctx.discovery().discoCache());
+
+ if (!ctx.state().clusterState().active())
+ throw new IgniteException("Changing BaselineTopology on inactive cluster is not allowed.");
+
+ if (baselineTop != null) {
+ if (baselineTop.isEmpty())
+ throw new IgniteException("BaselineTopology must contain at least one node.");
+
+ Collection<Object> onlineNodes = onlineBaselineNodesRequestedForRemoval(baselineTop);
+
+ if (onlineNodes != null) {
+ if (!onlineNodes.isEmpty()) {
+ throw new IgniteException("Removing online nodes from BaselineTopology is not supported: " +
+ onlineNodes);
+ }
+ }
+ }
+ }
+
+ /**
+ * Verifies all nodes in current cluster topology support BaselineTopology feature
+ * so compatibilityMode flag is enabled to reset.
+ *
+ * @param discoCache Discovery cache.
+ */
+ private void verifyBaselineTopologySupport(DiscoCache discoCache) {
+ if (discoCache.minimumServerNodeVersion().compareTo(MIN_BLT_SUPPORTING_VER) < 0) {
+ SB sb = new SB("Cluster contains nodes that don't support BaselineTopology: [");
+
+ for (ClusterNode cn : discoCache.serverNodes()) {
+ if (cn.version().compareTo(MIN_BLT_SUPPORTING_VER) < 0)
+ sb
+ .a('[')
+ .a(cn.consistentId())
+ .a(':')
+ .a(cn.version())
+ .a("], ");
+ }
+
+ sb.d(sb.length() - 2, sb.length());
+
+ throw new IgniteException(sb.a(']').toString());
+ }
+ }
+
+ /** */
+ @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(Collection<? extends BaselineNode> newBlt) {
+ BaselineTopology blt = ctx.state().clusterState().baselineTopology();
+ Set<Object> bltConsIds;
+
+ if (blt == null)
+ return null;
+ else
+ bltConsIds = blt.consistentIds();
+
+ ArrayList<Object> onlineNodesRequestedForRemoval = new ArrayList<>();
+
+ Collection<Object> aliveNodesConsIds = getConsistentIds(ctx.discovery().aliveServerNodes());
+
+ Collection<Object> newBltConsIds = getConsistentIds(newBlt);
+
+ for (Object oldBltConsId : bltConsIds) {
+ if (aliveNodesConsIds.contains(oldBltConsId)) {
+ if (!newBltConsIds.contains(oldBltConsId))
+ onlineNodesRequestedForRemoval.add(oldBltConsId);
+ }
+ }
+
+ return onlineNodesRequestedForRemoval;
+ }
+
+ /** */
+ private Collection<Object> getConsistentIds(Collection<? extends BaselineNode> nodes) {
+ ArrayList<Object> res = new ArrayList<>(nodes.size());
+
+ for (BaselineNode n : nodes)
+ res.add(n.consistentId());
+
+ return res;
+ }
+
+ /**
* @param reqId Request ID.
* @param initNodeId Initialize node id.
* @param ex Exception.
@@ -1374,14 +1477,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
if (log.isDebugEnabled()) {
log.debug("Received activation response [requestId=" + msg.getRequestId() +
- ", nodeId=" + nodeId + "]");
+ ", nodeId=" + nodeId + ']');
}
- UUID requestId = msg.getRequestId();
+ UUID reqId = msg.getRequestId();
final GridChangeGlobalStateFuture fut = stateChangeFut.get();
- if (fut != null && requestId.equals(fut.requestId)) {
+ if (fut != null && reqId.equals(fut.reqId)) {
if (fut.initFut.isDone())
fut.onResponse(nodeId, msg);
else {
@@ -1403,11 +1506,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
private void onStateRestored(BaselineTopology blt) {
DiscoveryDataClusterState state = globalState;
- if (!state.active() && !state.transition() && state.baselineTopology() == null) {
- DiscoveryDataClusterState newState = DiscoveryDataClusterState.createState(false, blt);
-
- globalState = newState;
- }
+ if (!state.active() && !state.transition() && state.baselineTopology() == null)
+ globalState = DiscoveryDataClusterState.createState(false, blt);
}
/**
@@ -1518,7 +1618,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
/** {@inheritDoc} */
- @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture, boolean hasMovingPartitions) {
+ @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchFut, boolean hasMovingPartitions) {
// no-op
}
@@ -1627,7 +1727,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
private class GridChangeGlobalStateFuture extends GridFutureAdapter<Void> {
/** Request id. */
@GridToStringInclude
- private final UUID requestId;
+ private final UUID reqId;
/** Activate. */
private final boolean activate;
@@ -1657,12 +1757,12 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
private final IgniteLogger log;
/**
- * @param requestId State change request ID.
+ * @param reqId State change request ID.
* @param activate New cluster state.
* @param ctx Context.
*/
- GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
- this.requestId = requestId;
+ GridChangeGlobalStateFuture(UUID reqId, boolean activate, GridKernalContext ctx) {
+ this.reqId = reqId;
this.activate = activate;
this.ctx = ctx;
@@ -1670,10 +1770,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
}
/**
- * @param event Event.
+ * @param evt Event.
*/
- void onNodeLeft(DiscoveryEvent event) {
- assert event != null;
+ void onNodeLeft(DiscoveryEvent evt) {
+ assert evt != null;
if (isDone())
return;
@@ -1681,7 +1781,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
boolean allReceived = false;
synchronized (mux) {
- if (remaining.remove(event.eventNode().id()))
+ if (remaining.remove(evt.eventNode().id()))
allReceived = remaining.isEmpty();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java
new file mode 100644
index 0000000..ff82b19
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/NoOpTransactionalDrProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.txdr;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class NoOpTransactionalDrProcessor extends GridProcessorAdapter implements TransactionalDrProcessor {
+ /**
+ * @param ctx Context.
+ */
+ public NoOpTransactionalDrProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckPointBegin(long snapshotId, WALPointer ptr, SnapshotOperation snapshotOperation) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onPartitionsFullMessagePrepared(
+ @Nullable GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsFullMessage fullMsg
+ ) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onChangeGlobalStateMessagePrepared(ChangeGlobalStateMessage msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldIgnoreAssignPartitionStates(GridDhtPartitionsExchangeFuture fut) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldScheduleRebalance(GridDhtPartitionsExchangeFuture fut) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldApplyUpdateCounterOnRebalance() {
+ return false;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java
new file mode 100644
index 0000000..8b679a4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/txdr/TransactionalDrProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.txdr;
+
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
+import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface TransactionalDrProcessor extends GridProcessor, IgniteChangeGlobalStateSupport {
+ /**
+ * @param snapshotId Snapshot id.
+ * @param ptr Pointer to the {@link SnapshotRecord}.
+ * @param snapshotOperation Snapshot operation.
+ */
+ public void onMarkCheckPointBegin(long snapshotId, WALPointer ptr, SnapshotOperation snapshotOperation);
+
+ /**
+ * @param exchId Partition exchange id.
+ * This parameter has always a non-null value in case of the message is created for an exchange.
+ * @param fullMsg Partitions full message.
+ */
+ public void onPartitionsFullMessagePrepared(@Nullable GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsFullMessage fullMsg);
+
+ /**
+ * @param msg Change global state message.
+ */
+ public void onChangeGlobalStateMessagePrepared(ChangeGlobalStateMessage msg);
+
+ /**
+ * Returns true if we should skip assigning MOVING state to partitions due to outdated counters.
+ * The idea is to avoid redundant rebalance in case of random discovery events on REPLICA cluster.
+ * If event is "special" and rebalance really should happen according to REPLICA lifecycle, method will return true.
+ *
+ * @param fut Current exchange future.
+ */
+ public boolean shouldIgnoreAssignPartitionStates(GridDhtPartitionsExchangeFuture fut);
+
+ /**
+ * Returns true if we should schedule rebalance for MOVING partitions even if ideal assignment wasn't changed.
+ *
+ * @param fut Current exchange future.
+ */
+ public boolean shouldScheduleRebalance(GridDhtPartitionsExchangeFuture fut);
+
+ /**
+ * Returns {@code true} if update counters that are placed into {@link GridDhtPartitionSupplyMessage#last()}
+ * should be applied when the rebalance is finished.
+ *
+ * @return {@code true} if update counters should be applied when the rebalance is finished.
+ */
+ public boolean shouldApplyUpdateCounterOnRebalance();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
index 6a271f9..fade773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCircularBuffer.java
@@ -18,6 +18,8 @@ package org.apache.ignite.internal.util;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
@@ -26,12 +28,13 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* This class implements a circular buffer for efficient data exchange.
*/
-public class GridCircularBuffer<T> {
+public class GridCircularBuffer<T> implements Iterable<T> {
/** */
private final long sizeMask;
@@ -129,6 +132,50 @@ public class GridCircularBuffer<T> {
return arr[idx0].update(idx, t, arr.length, c);
}
+ /**
+ * Returns read-only iterator over the elements.
+ * <p>
+ * The iterator can be used concurrently with adding new elements to the buffer,
+ * but the data read through iteration may be inconsistent then.
+ *
+ * @return Iterator.
+ */
+ @NotNull @Override public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ int i;
+ int rest;
+
+ {
+ int tail = (int)sizeMask;
+
+ while (tail >= 0 && arr[tail].item == null)
+ tail--;
+
+ rest = tail + 1;
+
+ i = rest <= sizeMask ? 0 : idxGen.intValue() & (int)sizeMask;
+ };
+
+ @Override public boolean hasNext() {
+ return rest > 0;
+ }
+
+ @Override public T next() {
+ if (rest <= 0)
+ throw new NoSuchElementException();
+
+ T res = arr[i++].item;
+
+ if (i > sizeMask)
+ i = 0;
+
+ rest--;
+
+ return res;
+ }
+ };
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCircularBuffer.class, this);
@@ -202,16 +249,19 @@ public class GridCircularBuffer<T> {
idx = newIdx; // Index should be updated even if closure fails.
- if (c != null && item != null)
- c.applyx(item);
+ try {
+ if (c != null && item != null)
+ c.applyx(item);
- V old = item;
+ V old = item;
- item = newItem;
+ item = newItem;
- notifyAll();
-
- return old;
+ return old;
+ }
+ finally {
+ notifyAll();
+ }
}
/**
@@ -226,4 +276,4 @@ public class GridCircularBuffer<T> {
return S.toString(Item.class, this, "hash=" + System.identityHashCode(this));
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 84d6510..81a90bf 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1928,6 +1928,17 @@ public abstract class IgniteUtils {
* @param <E> Entry type
* @return Sealed collection.
*/
+ public static <E> Set<E> sealSet(Collection<E> c) {
+ return Collections.unmodifiableSet(new HashSet<>(c));
+ }
+
+ /**
+ * Seal collection.
+ *
+ * @param c Collection to seal.
+ * @param <E> Entry type
+ * @return Sealed collection.
+ */
public static <E> List<E> sealList(Collection<E> c) {
return Collections.unmodifiableList(new ArrayList<>(c));
}
@@ -9503,6 +9514,32 @@ public abstract class IgniteUtils {
}
/**
+ * Creates new {@link Set} based on {@link ConcurrentHashMap}.
+ *
+ * @param <T> Type of elements.
+ * @return New concurrent set.
+ */
+ public static <T> Set<T> newConcurrentHashSet() {
+ return Collections.newSetFromMap(new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Constructs a new {@link Set} based on {@link ConcurrentHashMap},
+ * containing the elements in the specified collection.
+ *
+ * @param <T> Type of elements.
+ * @param c Source collection.
+ * @return New concurrent set.
+ */
+ public static <T> Set<T> newConcurrentHashSet(Collection<T> c) {
+ Set<T> set = newConcurrentHashSet();
+
+ set.addAll(c);
+
+ return set;
+ }
+
+ /**
* Creates new map that limited by size.
*
* @param limit Limit for size.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index a57eb95..694636f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -290,6 +290,16 @@ public class StripedExecutor implements ExecutorService {
}
/**
+ * @param idx Stripe index.
+ * @return Queue size of specific stripe.
+ */
+ public int queueSize(int idx) {
+ A.ensure(idx >= 0, "Stripe index should be non-negative: " + idx);
+
+ return stripes[idx % stripes.length].queueSize();
+ }
+
+ /**
* @return Completed tasks count.
*/
public long completedTasks() {
diff --git a/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java b/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java
new file mode 100644
index 0000000..ebc8241
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/logger/EchoingLogger.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.logger;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link IgniteLogger} wrapper that echoes log messages to arbitrary target.
+ */
+public class EchoingLogger implements IgniteLogger {
+ /** */
+ private final IgniteLogger delegate;
+
+ /** */
+ private final Consumer<String> echoTo;
+
+ /**
+ * @param echoTo Echo to.
+ * @param delegate Delegate.
+ */
+ public EchoingLogger(@NotNull IgniteLogger delegate, @NotNull Consumer<String> echoTo) {
+ this.delegate = Objects.requireNonNull(delegate);
+ this.echoTo = Objects.requireNonNull(echoTo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger getLogger(Object ctgr) {
+ return new EchoingLogger(delegate.getLogger(ctgr), echoTo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void trace(String msg) {
+ if (delegate.isTraceEnabled()) {
+ delegate.trace(msg);
+
+ echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "TRACE", msg));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void debug(String msg) {
+ if (delegate.isDebugEnabled()) {
+ delegate.debug(msg);
+
+ echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "DEBUG", msg));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void info(String msg) {
+ if (delegate.isInfoEnabled()) {
+ delegate.info(msg);
+
+ echoTo.accept(String.format("[%-23s][%-5s] %s", now(), "INFO", msg));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void warning(String msg, @Nullable Throwable e) {
+ delegate.warning(msg, e);
+
+ echoTo.accept(String.format("[%-23s][%-5s] %s%s", now(), "WARN", msg, formatThrowable(Optional.ofNullable(e))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(String msg, @Nullable Throwable e) {
+ delegate.error(msg, e);
+
+ echoTo.accept(String.format("[%-23s][%-5s] %s%s", now(), "ERROR", msg, formatThrowable(Optional.ofNullable(e))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTraceEnabled() {
+ return delegate.isTraceEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDebugEnabled() {
+ return delegate.isDebugEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInfoEnabled() {
+ return delegate.isInfoEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isQuiet() {
+ return delegate.isQuiet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fileName() {
+ return delegate.fileName();
+ }
+
+ /**
+ *
+ */
+ private static String now() {
+ return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ }
+
+ /**
+ *
+ */
+ private static String formatThrowable(Optional<Throwable> e) {
+ return e.map(EchoingLogger::formatThrowable).orElse("");
+ }
+
+ /**
+ *
+ */
+ private static String formatThrowable(@NotNull Throwable e) {
+ return (e.getMessage() != null ? ": " + e.getMessage() : "") + System.lineSeparator() +
+ Arrays.stream(e.getStackTrace())
+ .map(StackTraceElement::toString)
+ .map(s -> "at " + s)
+ .collect(Collectors.joining(System.lineSeparator()));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 77c365d..46beeeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1491,6 +1491,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
}
/**
+ * Sets grid start time.
+ *
+ * @param val New time value.
+ */
+ @SuppressWarnings("unused")
+ public void setGridStartTime(long val) {
+ this.gridStartTime = val;
+ }
+
+ /**
* @param sockAddr Remote address.
* @param timeoutHelper Timeout helper.
* @return Opened socket.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
index 7d09c23..c0c3289 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -119,4 +120,13 @@ public class ClusterReadOnlyModeAbstractTest extends GridCommonAbstractTest {
ignite.context().cache().context().readOnlyMode(readOnly);
}
}
+
+ /**
+ * @param e Exception.
+ */
+ public static void checkThatRootCauseIsReadOnly(Throwable e) {
+ for (Throwable t = e; t != null; t = t.getCause())
+ if (t.getCause() == null)
+ assertTrue(t.getMessage(), t instanceof IgniteClusterReadOnlyException);
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
index be9bf30..c0f243a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
@@ -84,7 +84,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
fail("Put must fail for cache " + cacheName);
}
catch (Exception e) {
- // No-op.
+ checkThatRootCauseIsReadOnly(e);
}
// All removes must fail.
@@ -94,7 +94,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
fail("Remove must fail for cache " + cacheName);
}
catch (Exception e) {
- // No-op.
+ checkThatRootCauseIsReadOnly(e);
}
}
else {
@@ -115,7 +115,7 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
for (Ignite ignite : G.allGrids()) {
for (String cacheName : CACHE_NAMES) {
- boolean failed = false;
+ Throwable failed = null;
try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cacheName)) {
for (int i = 0; i < 10; i++) {
@@ -125,11 +125,13 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
}
}
catch (CacheException ignored) {
- failed = true;
+ failed = ignored;
}
- if (failed != readOnly)
+ if ((failed == null) == readOnly)
fail("Streaming to " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
+
+ checkThatRootCauseIsReadOnly(failed);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 416a726..62863ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -166,6 +166,8 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
System.setProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED,
Boolean.toString(enablePendingTxTracker));
+ System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, Boolean.toString(enablePendingTxTracker));
+
return cfg;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 855f3a8..2ef8878 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -170,7 +170,7 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
boolean fail = false;
try (WALIterator it = sharedContext.wal().replay(null)) {
- dbMgr.applyUpdatesOnRecovery(it, (ptr, rec) -> true, (entry) -> true);
+ dbMgr.applyUpdatesOnRecovery(it, (ptr, rec) -> true, (rec, entry) -> true);
}
catch (IgniteCheckedException e) {
if (nodeStopPoint.needCleanUp)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index a1ae17e..a4c7a13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -20,6 +20,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -42,7 +43,7 @@ public class SegmentAwareTest {
*/
@Test
public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
int iterationCnt = 100_000;
int segmentToHandle = 1;
@@ -80,7 +81,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -97,7 +98,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -114,7 +115,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -137,7 +138,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -154,7 +155,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -174,7 +175,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -194,7 +195,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -214,7 +215,7 @@ public class SegmentAwareTest {
@Test
public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.curAbsWalIdx(5);
@@ -231,7 +232,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false);
+ SegmentAware aware = new SegmentAware(2, false, new NullLogger());
aware.curAbsWalIdx(1);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -251,7 +252,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false);
+ SegmentAware aware = new SegmentAware(2, false, new NullLogger());
aware.curAbsWalIdx(1);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -271,7 +272,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false);
+ SegmentAware aware = new SegmentAware(2, false, new NullLogger());
aware.curAbsWalIdx(2);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -297,7 +298,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -314,7 +315,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -331,7 +332,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -348,7 +349,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -365,7 +366,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -385,7 +386,7 @@ public class SegmentAwareTest {
@Test
public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.checkCanReadArchiveOrReserveWorkSegment(5);
@@ -404,7 +405,7 @@ public class SegmentAwareTest {
@Test
public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.checkCanReadArchiveOrReserveWorkSegment(5);
IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5));
@@ -425,7 +426,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true);
+ SegmentAware aware = new SegmentAware(10, true, new NullLogger());
aware.onSegmentCompressed(5);
@@ -444,7 +445,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true);
+ SegmentAware aware = new SegmentAware(10, true, new NullLogger());
aware.onSegmentCompressed(5);
@@ -462,7 +463,7 @@ public class SegmentAwareTest {
*/
@Test
public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException {
- SegmentAware aware = new SegmentAware(10, true);
+ SegmentAware aware = new SegmentAware(10, true, new NullLogger());
aware.setLastArchivedAbsoluteIndex(6);
@@ -476,7 +477,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true);
+ SegmentAware aware = new SegmentAware(10, true, new NullLogger());
aware.onSegmentCompressed(5);
IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress);
@@ -493,7 +494,7 @@ public class SegmentAwareTest {
*/
@Test
public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
- SegmentAware aware = new SegmentAware(10, true);
+ SegmentAware aware = new SegmentAware(10, true, new NullLogger());
for (int i = 0; i < 5; i++) {
aware.setLastArchivedAbsoluteIndex(i);
@@ -518,7 +519,7 @@ public class SegmentAwareTest {
@Test
public void testReserveCorrectly() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
//when: reserve one segment twice and one segment once.
aware.reserve(5);
@@ -562,7 +563,7 @@ public class SegmentAwareTest {
@Test
public void testAssertFail_WhenReleaseUnreservedSegment() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.reserve(5);
try {
@@ -582,7 +583,7 @@ public class SegmentAwareTest {
@Test
public void testReserveWorkSegmentCorrectly() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
//when: lock one segment twice.
aware.checkCanReadArchiveOrReserveWorkSegment(5);
@@ -616,7 +617,7 @@ public class SegmentAwareTest {
@Test
public void testAssertFail_WhenReleaseUnreservedWorkSegment() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false);
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.checkCanReadArchiveOrReserveWorkSegment(5);
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
index 956014f..a6a9b0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
@@ -18,22 +18,32 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.List;
import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
@@ -53,16 +63,24 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.reader
* The test check, that StandaloneWalRecordsIterator correctly close file descriptors associated with WAL files.
*/
public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
+ /** Wal segment size. */
+ private static final int WAL_SEGMENT_SIZE = 512 * 1024;
+
+ /** Wal compaction enabled. */
+ private boolean walCompactionEnabled;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
cfg.setDataStorageConfiguration(
- new DataStorageConfiguration().
- setDefaultDataRegionConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
)
+ .setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setWalCompactionEnabled(walCompactionEnabled)
);
return cfg;
@@ -70,6 +88,8 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ walCompactionEnabled = false;
+
super.beforeTest();
stopAllGrids();
@@ -89,6 +109,195 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
/**
*
*/
+ public void testBlinkingTemporaryFile() throws Exception {
+ walCompactionEnabled = true;
+
+ IgniteEx ig = (IgniteEx)startGrid();
+
+ String archiveWalDir = getArchiveWalDirPath(ig);
+
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(
+ new CacheConfiguration<>().setName("c-n").setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+ IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
+
+ IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+ WALPointer fromPtr = null;
+
+ int recordsCnt = WAL_SEGMENT_SIZE / 8 /* record size */ * 5;
+
+ AtomicBoolean stopBlinking = new AtomicBoolean(false);
+ AtomicInteger blinkIterations = new AtomicInteger(0);
+
+ IgniteInternalFuture blinkFut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ while (walMgr.lastCompactedSegment() < 2)
+ U.sleep(10);
+
+ File walArchive = new File(archiveWalDir);
+ File consIdFolder = new File(walArchive, "node00-" + ig.cluster().localNode().consistentId().toString());
+ File compressedWalSegment = new File(consIdFolder, FileDescriptor.fileName(1) + ".zip");
+ File compressedTmpWalSegment = new File(consIdFolder, FileDescriptor.fileName(1) + ".zip.tmp");
+
+ while (!stopBlinking.get()) {
+ Files.copy(compressedWalSegment.toPath(), compressedTmpWalSegment.toPath());
+
+ U.sleep(10);
+
+ U.delete(compressedTmpWalSegment);
+
+ blinkIterations.incrementAndGet();
+ }
+ }
+ catch (IgniteInterruptedCheckedException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, "blinky");
+
+ for (int i = 0; i < recordsCnt; i++) {
+ WALPointer ptr = walMgr.log(new PartitionDestroyRecord(i, i));
+
+ if (i == 100)
+ fromPtr = ptr;
+ }
+
+ assertNotNull(fromPtr);
+
+ cache.put(1, 1);
+
+ forceCheckpoint();
+
+ // Generate WAL segments for filling WAL archive folder.
+ for (int i = 0; i < 2 * ig.configuration().getDataStorageConfiguration().getWalSegments(); i++) {
+ sharedMgr.checkpointReadLock();
+
+ try {
+ walMgr.log(new SnapshotRecord(i, false), RolloverType.NEXT_SEGMENT);
+ }
+ finally {
+ sharedMgr.checkpointReadUnlock();
+ }
+ }
+
+ cache.put(2, 2);
+
+ forceCheckpoint();
+
+ System.out.println("@@@ " + blinkIterations.get() + " blink iterations already completed");
+
+ U.sleep(5000);
+
+ stopGrid();
+
+ for (int i = 0; i < 20; i++) {
+ WALIterator it = new IgniteWalIteratorFactory(log)
+ .iterator(new IteratorParametersBuilder().from((FileWALPointer)fromPtr).filesOrDirs(archiveWalDir));
+
+ TreeSet<Integer> foundCounters = new TreeSet<>();
+
+ it.forEach(x -> {
+ WALRecord rec = x.get2();
+
+ if (rec instanceof PartitionDestroyRecord)
+ foundCounters.add(((WalRecordCacheGroupAware)rec).groupId());
+ });
+
+ assertEquals(new Integer(100), foundCounters.first());
+ assertEquals(new Integer(recordsCnt - 1), foundCounters.last());
+ assertEquals(recordsCnt - 100, foundCounters.size());
+
+ System.out.println("@@@ " + blinkIterations.get() + " blink iterations already completed");
+ }
+
+ stopBlinking.set(true);
+
+ System.out.println("@@@ " + blinkIterations.get() + " blink iterations finally completed");
+
+ blinkFut.get();
+ }
+
+ /**
+ *
+ */
+ public void testBoundedIterationOverSeveralSegments() throws Exception {
+ walCompactionEnabled = true;
+
+ IgniteEx ig = (IgniteEx)startGrid();
+
+ String archiveWalDir = getArchiveWalDirPath(ig);
+
+ ig.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(
+ new CacheConfiguration<>().setName("c-n").setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+ IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database();
+
+ IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+ WALPointer fromPtr = null;
+
+ int recordsCnt = WAL_SEGMENT_SIZE / 8 /* record size */ * 5;
+
+ for (int i = 0; i < recordsCnt; i++) {
+ WALPointer ptr = walMgr.log(new PartitionDestroyRecord(i, i));
+
+ if (i == 100)
+ fromPtr = ptr;
+ }
+
+ assertNotNull(fromPtr);
+
+ cache.put(1, 1);
+
+ forceCheckpoint();
+
+ // Generate WAL segments for filling WAL archive folder.
+ for (int i = 0; i < 2 * ig.configuration().getDataStorageConfiguration().getWalSegments(); i++) {
+ sharedMgr.checkpointReadLock();
+
+ try {
+ walMgr.log(new SnapshotRecord(i, false), RolloverType.NEXT_SEGMENT);
+ }
+ finally {
+ sharedMgr.checkpointReadUnlock();
+ }
+ }
+
+ cache.put(2, 2);
+
+ forceCheckpoint();
+
+ U.sleep(5000);
+
+ stopGrid();
+
+ WALIterator it = new IgniteWalIteratorFactory(log)
+ .iterator(new IteratorParametersBuilder().from((FileWALPointer)fromPtr).filesOrDirs(archiveWalDir));
+
+ TreeSet<Integer> foundCounters = new TreeSet<>();
+
+ it.forEach(x -> {
+ WALRecord rec = x.get2();
+
+ if (rec instanceof PartitionDestroyRecord)
+ foundCounters.add(((WalRecordCacheGroupAware)rec).groupId());
+ });
+
+ assertEquals(new Integer(100), foundCounters.first());
+ assertEquals(new Integer(recordsCnt - 1), foundCounters.last());
+ assertEquals(recordsCnt - 100, foundCounters.size());
+ }
+
+ /**
+ * Check correct closing file descriptors.
+ *
+ */
private String createWalFiles() throws Exception {
IgniteEx ig = (IgniteEx)startGrid();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java
new file mode 100644
index 0000000..39ed106
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/LocalPendingTransactionsTrackerTest.java
@@ -0,0 +1,853 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridDebug;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit test for {@link LocalPendingTransactionsTracker}
+ */
+public class LocalPendingTransactionsTrackerTest {
+ /** Timeout executor. */
+ private static ScheduledExecutorService timeoutExecutor;
+
+ /** Tracker. */
+ private LocalPendingTransactionsTracker tracker;
+
+ /**
+ *
+ */
+ @BeforeClass
+ public static void setUpClass() {
+ timeoutExecutor = new ScheduledThreadPoolExecutor(1);
+
+ U.onGridStart();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED, "true");
+ }
+
+ /**
+ *
+ */
+ @AfterClass
+ public static void tearDownClass() {
+ timeoutExecutor.shutdown();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED);
+ }
+
+ /**
+ *
+ */
+ @Before
+ public void setUp() {
+ GridTimeoutProcessor time = Mockito.mock(GridTimeoutProcessor.class);
+ Mockito.when(time.addTimeoutObject(Mockito.any())).thenAnswer(mock -> {
+ GridTimeoutObject timeoutObj = (GridTimeoutObject)mock.getArguments()[0];
+
+ long endTime = timeoutObj.endTime();
+
+ timeoutExecutor.schedule(timeoutObj::onTimeout, endTime - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+
+ return null;
+ });
+
+ GridCacheSharedContext<?, ?> cctx = Mockito.mock(GridCacheSharedContext.class);
+ Mockito.when(cctx.time()).thenReturn(time);
+ Mockito.when(cctx.logger(LocalPendingTransactionsTracker.class)).thenReturn(new GridTestLog4jLogger());
+
+ tracker = new LocalPendingTransactionsTracker(cctx);
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testCurrentlyPreparedTxs() {
+ txPrepare(1);
+ txKeyWrite(1, 10);
+ txKeyWrite(1, 11);
+
+ txPrepare(2);
+ txKeyWrite(2, 20);
+ txKeyWrite(2, 21);
+ txKeyWrite(2, 22);
+
+ txPrepare(3);
+ txKeyWrite(3, 30);
+
+ txCommit(2);
+
+ tracker.writeLockState();
+
+ try {
+ Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+ assertEquals(2, currentlyPreparedTxs.size());
+ assertTrue(currentlyPreparedTxs.contains(nearXidVersion(1)));
+ assertTrue(currentlyPreparedTxs.contains(nearXidVersion(3)));
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ txKeyWrite(3, 31);
+ txCommit(3);
+
+ tracker.writeLockState();
+
+ try {
+ Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+ assertEquals(1, currentlyPreparedTxs.size());
+ assertTrue(currentlyPreparedTxs.contains(nearXidVersion(1)));
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testMultiplePrepareCommitMarkers() {
+ txPrepare(1);
+ txKeyWrite(1, 10);
+
+ txPrepare(2);
+ txKeyWrite(2, 20);
+ txPrepare(2);
+ txKeyWrite(2, 21);
+ txPrepare(2);
+ txKeyWrite(2, 22);
+
+ txPrepare(3);
+ txKeyWrite(3, 30);
+ txPrepare(3);
+ txKeyWrite(3, 31);
+
+ txCommit(3);
+ txCommit(3);
+
+ txCommit(1);
+
+ txCommit(2);
+ txCommit(2);
+
+ tracker.writeLockState();
+
+ try {
+ Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+ assertEquals(1, currentlyPreparedTxs.size());
+ assertTrue(currentlyPreparedTxs.contains(nearXidVersion(2)));
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testCommitsMoreThanPreparesForbidden() {
+ txPrepare(1);
+
+ txKeyWrite(1, 10);
+ txKeyWrite(1, 11);
+
+ txCommit(1);
+
+ try {
+ txCommit(1);
+
+ fail("We should fail if number of commits is more than number of prepares.");
+ }
+ catch (Throwable ignored) {
+ // Expected.
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testRollback() {
+ txRollback(1); // Tx can be rolled back before prepare.
+
+ txPrepare(2);
+ txKeyWrite(2, 20);
+
+ txPrepare(3);
+ txKeyWrite(3, 30);
+ txPrepare(3);
+ txKeyWrite(3, 31);
+
+ txCommit(3);
+
+ txRollback(2);
+ txRollback(3);
+
+ tracker.writeLockState();
+
+ try {
+ Set<GridCacheVersion> currentlyPreparedTxs = tracker.currentlyPreparedTxs();
+
+ assertEquals(0, currentlyPreparedTxs.size());
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test(timeout = 10_000)
+ public void testAwaitFinishOfPreparedTxs() throws Exception {
+ txPrepare(1);
+
+ txPrepare(2);
+ txPrepare(2);
+
+ txPrepare(3);
+ txPrepare(3);
+ txCommit(3);
+
+ txPrepare(4);
+ txCommit(4);
+
+ txPrepare(5);
+ txPrepare(5);
+ txPrepare(5);
+ txCommit(5);
+
+ tracker.writeLockState();
+
+ IgniteInternalFuture<Set<GridCacheVersion>> fut;
+ try {
+ tracker.startTxFinishAwaiting(1_000, 10_000);
+
+ fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ Thread.sleep(100);
+
+ txCommit(5);
+ txCommit(2);
+ txCommit(2);
+
+ long curTs = U.currentTimeMillis();
+
+ Set<GridCacheVersion> pendingTxs = fut.get();
+
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too long", U.currentTimeMillis() - curTs < 1_000);
+
+ assertEquals(3, pendingTxs.size());
+ assertTrue(pendingTxs.contains(nearXidVersion(1)));
+ assertTrue(pendingTxs.contains(nearXidVersion(3)));
+ assertTrue(pendingTxs.contains(nearXidVersion(5)));
+
+ txCommit(1);
+ txCommit(3);
+ txCommit(5);
+
+ tracker.writeLockState();
+
+ try {
+ tracker.startTxFinishAwaiting(1_000, 10_000);
+
+ fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertTrue(fut.get().isEmpty());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test(timeout = 10_000)
+ public void testAwaitFinishOfPreparedTxsTimeouts() throws Exception {
+ txPrepare(1);
+ txCommit(1);
+
+ txPrepare(2);
+ txKeyRead(2, 10);
+
+ txPrepare(3);
+ txKeyWrite(3, 11);
+
+ txPrepare(4);
+
+ long curTs, waitMs;
+
+ Set<GridCacheVersion> pendingTxs;
+
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ final CountDownLatch latch3 = new CountDownLatch(1);
+ final CountDownLatch latch4 = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ latch1.countDown();
+
+ latch2.await();
+
+ txCommit(2);
+
+ latch3.await();
+
+ Thread.sleep(200);
+ txCommit(3);
+
+ latch4.await();
+
+ Thread.sleep(200);
+ txCommit(4);
+ }
+ catch (InterruptedException ignored) {
+ // No-op
+ }
+ }).start();
+
+ latch1.await();
+
+ pendingTxs = awaitFinishOfPreparedTxs(100, 200);
+
+ assertEquals(3, pendingTxs.size());
+ assertTrue(pendingTxs.contains(nearXidVersion(2)));
+ assertTrue(pendingTxs.contains(nearXidVersion(3)));
+ assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+ latch2.countDown();
+
+ curTs = U.currentTimeMillis();
+
+ pendingTxs = awaitFinishOfPreparedTxs(100, 200);
+
+ waitMs = U.currentTimeMillis() - curTs;
+
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+ assertEquals(2, pendingTxs.size());
+ assertTrue(pendingTxs.contains(nearXidVersion(3)));
+ assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+ latch3.countDown();
+
+ curTs = U.currentTimeMillis();
+
+ pendingTxs = awaitFinishOfPreparedTxs(100, 300);
+
+ waitMs = U.currentTimeMillis() - curTs;
+
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+ assertEquals(1, pendingTxs.size());
+ assertTrue(pendingTxs.contains(nearXidVersion(4)));
+
+ latch4.countDown();
+
+ curTs = U.currentTimeMillis();
+
+ pendingTxs = awaitFinishOfPreparedTxs(300, 500);
+
+ waitMs = U.currentTimeMillis() - curTs;
+
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too short: " + waitMs, waitMs > 200 - 50);
+ assertTrue("Waiting for awaitFinishOfPreparedTxs future too long: " + waitMs, waitMs < 200 + 500);
+
+ assertTrue(pendingTxs.isEmpty());
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void trackingCommittedTest() {
+ txPrepare(1);
+ txCommit(1);
+
+ txPrepare(2);
+
+ tracker.writeLockState();
+ try {
+ tracker.startTrackingCommitted();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ txCommit(2);
+
+ txPrepare(3);
+ txCommit(3);
+
+ txPrepare(4);
+
+ tracker.writeLockState();
+
+ Set<GridCacheVersion> committedTxs;
+ try {
+ committedTxs = tracker.stopTrackingCommitted().committedTxs();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertEquals(2, committedTxs.size());
+ assertTrue(committedTxs.contains(nearXidVersion(2)));
+ assertTrue(committedTxs.contains(nearXidVersion(3)));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void trackingPreparedTest() {
+ txPrepare(1);
+ txCommit(1);
+
+ txPrepare(2);
+
+ tracker.writeLockState();
+ try {
+ tracker.startTrackingPrepared();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ txCommit(2);
+
+ txPrepare(3);
+ txCommit(3);
+
+ txPrepare(4);
+
+ tracker.writeLockState();
+
+ Set<GridCacheVersion> committedTxs;
+ try {
+ committedTxs = tracker.stopTrackingPrepared();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertEquals(2, committedTxs.size());
+ assertTrue(committedTxs.contains(nearXidVersion(3)));
+ assertTrue(committedTxs.contains(nearXidVersion(4)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test(timeout = 10_000)
+ public void testConsistentCutUseCase() throws Exception {
+ txPrepare(1);
+ txPrepare(2);
+ txPrepare(3);
+
+ txCommit(3);
+
+ tracker.writeLockState(); // Cut 1.
+
+ IgniteInternalFuture<Set<GridCacheVersion>> awaitFutCut1;
+ try {
+ tracker.startTrackingCommitted();
+
+ tracker.startTxFinishAwaiting(1_000, 10_000);
+
+ awaitFutCut1 = tracker.awaitPendingTxsFinished(Collections.emptySet());
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ txCommit(1);
+
+ Set<GridCacheVersion> failedToFinish = awaitFutCut1.get();
+
+ assertEquals(1, failedToFinish.size());
+ assertTrue(failedToFinish.contains(nearXidVersion(2)));
+
+ txCommit(2);
+
+ txPrepare(4);
+ txCommit(4);
+
+ txPrepare(5);
+
+ txPrepare(6);
+
+ tracker.writeLockState(); // Cut 2.
+
+ Set<GridCacheVersion> committedFrom1to2;
+ Set<GridCacheVersion> preparedOn2;
+ try {
+ committedFrom1to2 = tracker.stopTrackingCommitted().committedTxs();
+
+ preparedOn2 = tracker.currentlyPreparedTxs();
+
+ tracker.startTrackingPrepared();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertEquals(2, preparedOn2.size());
+ assertTrue(preparedOn2.contains(nearXidVersion(5)));
+ assertTrue(preparedOn2.contains(nearXidVersion(6)));
+
+ assertEquals(3, committedFrom1to2.size());
+ assertTrue(committedFrom1to2.contains(nearXidVersion(1)));
+ assertTrue(committedFrom1to2.contains(nearXidVersion(2)));
+ assertTrue(committedFrom1to2.contains(nearXidVersion(4)));
+
+ txPrepare(7);
+ txPrepare(8);
+
+ txCommit(6);
+ txCommit(7);
+
+ tracker.writeLockState(); // Cut 3.
+ Set<GridCacheVersion> preparedFrom2to3;
+ try {
+ preparedFrom2to3 = tracker.stopTrackingPrepared();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertEquals(2, preparedFrom2to3.size());
+ assertTrue(preparedFrom2to3.contains(nearXidVersion(7)));
+ assertTrue(preparedFrom2to3.contains(nearXidVersion(8)));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testDependentTransactions() {
+ tracker.writeLockState();
+ try {
+ tracker.startTrackingCommitted();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ txPrepare(1);
+ txKeyRead(1, 0);
+ txKeyWrite(1, 10);
+ txKeyRead(1, 20);
+ txCommit(1);
+
+ txPrepare(2);
+ txKeyWrite(2, 30);
+ txKeyWrite(2, 40);
+ txCommit(2);
+
+ txPrepare(3);
+ txKeyRead(3, 10); // (w -> r) is a dependency
+ txCommit(3);
+
+ txPrepare(4);
+ txKeyWrite(4, 20); // (r -> w) is not a dependency
+ txCommit(4);
+
+ txPrepare(5);
+ txKeyRead(5, 30); // (w -> r) is a dependency
+ txCommit(5);
+
+ txPrepare(6);
+ txKeyWrite(6, 40); // (w -> w) is a dependency
+ txCommit(6);
+
+ txPrepare(7);
+ txKeyRead(7, 0); // (r -> r) is not a dependency
+ txCommit(7);
+
+ tracker.writeLockState();
+
+ TrackCommittedResult res;
+ try {
+ res = tracker.stopTrackingCommitted();
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ assertEquals(7, res.committedTxs().size());
+ assertEquals(2, res.dependentTxsGraph().size());
+
+ assertTrue(res.dependentTxsGraph().containsKey(nearXidVersion(1)));
+ assertTrue(res.dependentTxsGraph().containsKey(nearXidVersion(2)));
+
+ Set<GridCacheVersion> dependentFrom1 = res.dependentTxsGraph().get(nearXidVersion(1));
+ assertEquals(1, dependentFrom1.size());
+ assertTrue(dependentFrom1.contains(nearXidVersion(3)));
+
+ Set<GridCacheVersion> dependentFrom2 = res.dependentTxsGraph().get(nearXidVersion(2));
+ assertEquals(2, dependentFrom2.size());
+ assertTrue(dependentFrom2.contains(nearXidVersion(5)));
+ assertTrue(dependentFrom2.contains(nearXidVersion(6)));
+ }
+
+ /**
+ * Transaction tracker memory leak test.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTrackerMemoryLeak() throws Exception {
+ int allowedLeakSize = 100 * 1024;
+
+ // Warmup phase.
+ long sizeBefore = memoryFootprintForTransactionTracker(1000, 20);
+
+ // Main phase.
+ long sizeAfter = memoryFootprintForTransactionTracker(5000, 20);
+
+ assertTrue("Possible memory leak detected. Memory consumed before transaction tracking: " + sizeBefore +
+ ", memory consumed after transaction tracking: " + sizeAfter, sizeAfter - sizeBefore < allowedLeakSize);
+ }
+
+ /**
+ * @param iterationsCnt Iterations count.
+ * @param threadsCnt Threads count.
+ * @return Length of dump file.
+ * @throws Exception If failed.
+ */
+ private long memoryFootprintForTransactionTracker(int iterationsCnt, int threadsCnt) throws Exception {
+ AtomicInteger txCnt = new AtomicInteger();
+
+ AtomicInteger trackerState = new AtomicInteger();
+
+ File dumpFile = new File(U.defaultWorkDirectory(), "test.hprof");
+
+ String heapDumpFileName = dumpFile.getAbsolutePath();
+
+ Runnable txRunnable = new Runnable() {
+ @Override public void run() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int iteration = 0; iteration < iterationsCnt; iteration++) {
+ int txId = txCnt.incrementAndGet();
+
+ txPrepare(txId);
+
+ int opCnt = rnd.nextInt(100);
+
+ for (int i = 0; i < opCnt; i++) {
+ if (rnd.nextBoolean())
+ txKeyRead(txId, rnd.nextInt());
+ else
+ txKeyWrite(txId, rnd.nextInt());
+ }
+
+ if (rnd.nextInt(10) == 0)
+ txRollback(txId);
+ else
+ txCommit(txId);
+
+ // Change tracker state
+ if (rnd.nextInt(20) == 0) {
+ tracker.writeLockState();
+
+ try {
+ int state = trackerState.getAndIncrement();
+
+ switch (state % 4) {
+ case 0:
+ tracker.startTrackingPrepared();
+
+ break;
+ case 1:
+ tracker.stopTrackingPrepared();
+
+ break;
+ case 2:
+ tracker.startTrackingCommitted();
+
+ break;
+ case 3:
+ tracker.stopTrackingCommitted();
+ }
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+ }
+ }
+ }
+ };
+
+ GridTestUtils.runMultiThreaded(txRunnable, threadsCnt, "tx-runner");
+
+ tracker.writeLockState();
+
+ try {
+ int state = trackerState.get();
+
+ switch (state % 4) {
+ case 0:
+ break;
+ case 1:
+ tracker.stopTrackingPrepared();
+ tracker.startTrackingCommitted();
+ tracker.stopTrackingCommitted();
+
+ break;
+ case 2:
+ tracker.startTrackingCommitted();
+ tracker.stopTrackingCommitted();
+
+ break;
+ case 3:
+ tracker.stopTrackingCommitted();
+ }
+
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ GridDebug.dumpHeap(heapDumpFileName, true);
+
+ long fileSize = dumpFile.length();
+
+ dumpFile.delete();
+
+ return fileSize;
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ */
+ private void txPrepare(int txId) {
+ tracker.onTxPrepared(nearXidVersion(txId));
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ */
+ private void txCommit(int txId) {
+ tracker.onTxCommitted(nearXidVersion(txId));
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ */
+ private void txRollback(int txId) {
+ tracker.onTxRolledBack(nearXidVersion(txId));
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ * @param key Key.
+ */
+ private void txKeyWrite(int txId, int key) {
+ KeyCacheObjectImpl keyCacheObj = new KeyCacheObjectImpl(key, ByteBuffer.allocate(4).putInt(key).array(), 1);
+
+ tracker.onKeysWritten(nearXidVersion(txId), Collections.singletonList(keyCacheObj));
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ * @param key Key.
+ */
+ private void txKeyRead(int txId, int key) {
+ KeyCacheObjectImpl keyCacheObj = new KeyCacheObjectImpl(key, ByteBuffer.allocate(4).putInt(key).array(), 1);
+
+ tracker.onKeysRead(nearXidVersion(txId), Collections.singletonList(keyCacheObj));
+ }
+
+ /**
+ * @param txId Test transaction ID.
+ * @return Version of transaction for the given {@code txId}.
+ */
+ private GridCacheVersion nearXidVersion(int txId) {
+ return new GridCacheVersion(0, txId, 0);
+ }
+
+ /**
+ * @param preparedTxsTimeout Prepared transactions timeout.
+ * @param committingTxsTimeout Committing transactions timeout.
+ * @return Collection of local transactions in committing state.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Set<GridCacheVersion> awaitFinishOfPreparedTxs(
+ long preparedTxsTimeout,
+ long committingTxsTimeout
+ ) throws IgniteCheckedException {
+ IgniteInternalFuture<Set<GridCacheVersion>> fut;
+
+ tracker.writeLockState();
+
+ try {
+ tracker.startTxFinishAwaiting(preparedTxsTimeout, committingTxsTimeout);
+
+ fut = tracker.awaitPendingTxsFinished(Collections.emptySet());
+ }
+ finally {
+ tracker.writeUnlockState();
+ }
+
+ return fut.get();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
index fc995a8..90dfd98 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridCircularBufferSelfTest.java
@@ -17,9 +17,13 @@
package org.apache.ignite.lang.utils;
import java.util.Deque;
+import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntConsumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.util.GridCircularBuffer;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -177,4 +181,79 @@ public class GridCircularBufferSelfTest extends GridCommonAbstractTest {
info("Buffer: " + buf);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testEmptyBufIterator() throws Exception {
+ assertFalse(new GridCircularBuffer<>(8).iterator().hasNext());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHalfFullBufIterator() throws Exception {
+ int size = 8;
+
+ GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+ IntStream.range(0, size / 2).forEach(makeConsumer(buf));
+
+ checkExpectedRange(0, size / 2, buf);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFullBufIterator() throws Exception {
+ int size = 8;
+
+ GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+ IntStream.range(0, size).forEach(makeConsumer(buf));
+
+ checkExpectedRange(0, size, buf);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testOverflownBufIterator() throws Exception {
+ int size = 8;
+
+ GridCircularBuffer<Integer> buf = new GridCircularBuffer<>(size);
+
+ IntStream.range(0, 3 * size / 2).forEach(makeConsumer(buf));
+
+ checkExpectedRange(size / 2, 3 * size / 2, buf);
+ }
+
+ /**
+ *
+ */
+ private static IntConsumer makeConsumer(GridCircularBuffer<Integer> buf) {
+ return t -> {
+ try {
+ buf.add(t);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ };
+ }
+
+ /**
+ *
+ */
+ private void checkExpectedRange(int beginInclusive, int endExclusive, GridCircularBuffer<Integer> buf) {
+ Iterator<Integer> iter = buf.iterator();
+
+ IntStream.range(beginInclusive, endExclusive).forEach(i -> assertEquals(i, iter.next().intValue()));
+
+ assertFalse(iter.hasNext());
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 7797f23..f45df11 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.integration.CompletionListener;
@@ -590,6 +591,21 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
Ignite crd = null;
for (Ignite g : G.allGrids()) {
+ if (nodes != null) {
+ Set<UUID> gClusterNodeIds = g.cluster().nodes().stream()
+ .map(ClusterNode::id)
+ .collect(Collectors.toSet());
+
+ Set<UUID> awaitPmeNodeIds = nodes.stream()
+ .map(ClusterNode::id)
+ .collect(Collectors.toSet());
+
+ gClusterNodeIds.retainAll(awaitPmeNodeIds);
+
+ if (gClusterNodeIds.isEmpty())
+ continue; // Node g is from another cluster and can't be elected as coordinator.
+ }
+
ClusterNode node = g.cluster().localNode();
if (crd == null || node.order() < crd.cluster().localNode().order()) {
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 490f7f5..f62d110 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -2747,7 +2747,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
db.checkpointReadLock();
try {
- U.invoke(GridCacheDatabaseSharedManager.class, db, "applyUpdate", ctx, dataEntry);
+ U.invoke(GridCacheDatabaseSharedManager.class, db, "applyUpdate", ctx, dataEntry, false);
}
finally {
db.checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
index 6fdb03a..c0ec2a6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeSqlTest.java
@@ -59,7 +59,7 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
cur.getAll();
}
- boolean failed = false;
+ Throwable failed = null;
try (FieldsQueryCursor<?> cur = cache.query(new SqlFieldsQuery("DELETE FROM Integer"))) {
cur.getAll();
@@ -68,13 +68,15 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
if (!readOnly)
log.error("Failed to delete data", ex);
- failed = true;
+ failed = ex;
}
- if (failed != readOnly)
+ if ((failed == null) == readOnly)
fail("SQL delete from " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
- failed = false;
+ checkThatRootCauseIsReadOnly(failed);
+
+ failed = null;
try (FieldsQueryCursor<?> cur = cache.query(new SqlFieldsQuery(
"INSERT INTO Integer(_KEY, _VAL) VALUES (?, ?)").setArgs(rnd.nextInt(1000), rnd.nextInt()))) {
@@ -84,11 +86,13 @@ public class ClusterReadOnlyModeSqlTest extends ClusterReadOnlyModeAbstractTest
if (!readOnly)
log.error("Failed to insert data", ex);
- failed = true;
+ failed = ex;
}
- if (failed != readOnly)
+ if ((failed == null) == readOnly)
fail("SQL insert into " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
+
+ checkThatRootCauseIsReadOnly(failed);
}
}
}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 5651060..579eace 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -410,6 +410,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis
return impl.gridStartTime();
}
+
+ /**
+ * Sets grid start time.
+ *
+ * @param val New time value.
+ */
+ @SuppressWarnings("unused")
+ public void setGridStartTime(long val) {
+ impl.setGridStartTime(val);
+ }
+
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 92c9658..f894a75 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -47,7 +47,7 @@ class ZkRuntimeState {
int joinDataPartCnt;
/** */
- long gridStartTime;
+ volatile long gridStartTime;
/** */
volatile boolean joined;
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c00c637..95fadb7 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -705,6 +705,15 @@ public class ZookeeperDiscoveryImpl {
}
/**
+ * Sets grid start time.
+ *
+ * @param val New time value.
+ */
+ public void setGridStartTime(long val) {
+ rtState.gridStartTime = val;
+ }
+
+ /**
* Starts join procedure and waits for {@link EventType#EVT_NODE_JOINED} event for local node.
*
* @throws InterruptedException If interrupted.
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
index 9913f61..36b04d9a 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestUtil.java
@@ -32,6 +32,15 @@ public class ZookeeperDiscoverySpiTestUtil {
* @return Test cluster.
*/
public static TestingCluster createTestingCluster(int instances) {
+ return createTestingCluster(instances, 0);
+ }
+
+ /**
+ * @param instances Number of instances in.
+ * @param firstInstanceIdx First instance index.
+ * @return Test cluster.
+ */
+ public static TestingCluster createTestingCluster(int instances, int firstInstanceIdx) {
String tmpDir;
tmpDir = System.getenv("TMPFS_ROOT") != null
@@ -39,7 +48,7 @@ public class ZookeeperDiscoverySpiTestUtil {
List<InstanceSpec> specs = new ArrayList<>();
- for (int i = 0; i < instances; i++) {
+ for (int i = firstInstanceIdx, n = firstInstanceIdx + instances; i < n; i++) {
File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
if (file.isDirectory())
diff --git a/parent/pom.xml b/parent/pom.xml
index 3546ebe..8564221 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -490,7 +490,7 @@
<packages>org.apache.ignite.springdata20.repository*</packages>
</group>
<group>
- <title>RocketMQ integration</title>
+ <title>RocketMQ integration</title>
<packages>org.apache.ignite.stream.rocketmq*</packages>
</group>
<group>