You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/10/27 17:52:34 UTC
[ignite-3] branch main updated: IGNITE-17263 Leader to replica safe time propagation implemented (#1177)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fe54738e4d IGNITE-17263 Leader to replica safe time propagation implemented (#1177)
fe54738e4d is described below
commit fe54738e4dce6c0538e0de06f460dd37c2f46d57
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Thu Oct 27 20:52:29 2022 +0300
IGNITE-17263 Leader to replica safe time propagation implemented (#1177)
---
.../internal/cluster/management/MockNode.java | 4 +-
.../management/raft/ItCmgRaftServiceTest.java | 4 +-
.../apache/ignite/internal/hlc/HybridClock.java | 72 +------
.../hlc/{HybridClock.java => HybridClockImpl.java} | 14 +-
.../util/PendingComparableValuesTracker.java | 132 +++++++++++++
.../apache/ignite/internal/HybridClockTest.java | 9 +-
.../apache/ignite/internal/TestHybridClock.java} | 58 +++---
.../util/PendingComparableValuesTrackerTest.java | 134 +++++++++++++
modules/raft/pom.xml | 6 +
.../ignite/internal/raft/ItLearnersTest.java | 4 +-
.../apache/ignite/internal/raft/ItLozaTest.java | 4 +-
.../internal/raft/ItRaftGroupServiceTest.java | 4 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 7 +-
.../raft/server/ItJraftCounterServerTest.java | 166 +---------------
.../apache/ignite/raft/server/ItSafeTimeTest.java | 142 +++++++++++++
.../ignite/raft/server/JraftAbstractTest.java | 219 +++++++++++++++++++++
.../java/org/apache/ignite/internal/raft/Loza.java | 24 +++
.../internal/raft/server/RaftGroupOptions.java | 19 ++
.../raft/server/ReplicationGroupOptions.java | 48 +++++
.../internal/raft/server/impl/JraftServerImpl.java | 4 +
.../ignite/raft/jraft/core/FSMCallerImpl.java | 6 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 14 ++
.../apache/ignite/raft/jraft/core/Replicator.java | 6 +-
.../ignite/raft/jraft/option/FSMCallerOptions.java | 10 +
.../ignite/raft/jraft/option/NodeOptions.java | 17 +-
.../raft/jraft/util/SafeTimeCandidateManager.java | 90 +++++++++
.../org/apache/ignite/internal/raft/LozaTest.java | 4 +-
.../ignite/raft/jraft/core/FSMCallerTest.java | 9 +
.../jraft/util/SafeTimeCandidatesManagerTest.java | 51 +++++
.../apache/ignite/internal/replicator/Replica.java | 8 +
.../ignite/internal/replicator/ReplicaManager.java | 43 +++-
.../replicator/command/SafeTimeSyncCommand.java | 26 +++
.../replicator/message/ReplicaMessageGroup.java | 3 +
...eGroup.java => ReplicaSafeTimeSyncRequest.java} | 22 +--
.../ItDistributedConfigurationPropertiesTest.java | 4 +-
.../ItDistributedConfigurationStorageTest.java | 4 +-
.../storage/ItRebalanceDistributedTest.java | 9 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../internal/runner/app/ItTablesApiTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../sql/engine/exec/MockedStructuresTest.java | 3 +-
.../engine/exec/rel/TableScanExecutionTest.java | 3 +-
.../storage/AbstractMvPartitionStorageTest.java | 3 +-
.../ItAbstractInternalTableScanTest.java | 3 +-
.../ItInternalTableReadOnlyOperationsTest.java | 3 +-
.../ItInternalTableReadOnlyScanTest.java | 3 +-
.../ignite/distributed/ItTablePersistenceTest.java | 9 +-
.../distributed/ItTxDistributedTestSingleNode.java | 35 +++-
.../ignite/internal/table/ItColocationTest.java | 3 +-
.../internal/table/distributed/TableManager.java | 28 ++-
.../table/distributed/raft/PartitionListener.java | 12 ++
.../replicator/PartitionReplicaListener.java | 163 ++++++++++-----
.../apache/ignite/internal/table/TxLocalTest.java | 4 +-
.../table/distributed/TableManagerTest.java | 3 +-
.../PartitionRaftCommandsSerializationTest.java | 5 +-
.../raft/PartitionCommandListenerTest.java | 5 +-
.../incoming/IncomingSnapshotCopierTest.java | 3 +-
.../snapshot/outgoing/OutgoingSnapshotTest.java | 3 +-
.../SnapshotAwarePartitionDataStorageTest.java | 6 +-
.../PartitionReplicaListenerIndexLockingTest.java | 8 +-
.../replication/PartitionReplicaListenerTest.java | 11 +-
.../table/impl/DummyInternalTableImpl.java | 14 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 4 +-
63 files changed, 1320 insertions(+), 421 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 28c95c33b1..3c059d2ae9 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.util.ReverseIterator;
@@ -74,7 +74,7 @@ public class MockNode {
this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
- Loza raftManager = new Loza(clusterService, null, workDir, new HybridClock());
+ Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
this.clusterManager = new ClusterManagementGroupManager(
vaultManager,
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 017abd973f..c62e079067 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -89,7 +89,7 @@ public class ItCmgRaftServiceTest {
Node(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder, Path workDir) {
this.clusterService = clusterService(testInfo, addr.port(), nodeFinder);
- this.raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+ this.raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
}
void start() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 591c96c516..1c7e4fc1f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -17,63 +17,16 @@
package org.apache.ignite.internal.hlc;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.time.Clock;
-import org.apache.ignite.internal.tostring.S;
-
/**
* A Hybrid Logical Clock.
*/
-public class HybridClock {
- /**
- * Var handle for {@link #latestTime}.
- */
- private static final VarHandle LATEST_TIME;
-
- static {
- try {
- LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
- /** Latest timestamp. */
- private volatile HybridTimestamp latestTime;
-
- /**
- * The constructor which initializes the latest time to current time by system clock.
- */
- public HybridClock() {
- this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
- }
-
+public interface HybridClock {
/**
* Creates a timestamp for new event.
*
* @return The hybrid timestamp.
*/
- public HybridTimestamp now() {
- while (true) {
- long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
-
- // Read the latest time after accessing UTC time to reduce contention.
- HybridTimestamp latestTime = this.latestTime;
-
- HybridTimestamp newLatestTime;
-
- if (latestTime.getPhysical() >= currentTimeMillis) {
- newLatestTime = latestTime.addTicks(1);
- } else {
- newLatestTime = new HybridTimestamp(currentTimeMillis, 0);
- }
-
- if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
- return newLatestTime;
- }
- }
- }
+ HybridTimestamp now();
/**
* Creates a timestamp for a received event.
@@ -81,24 +34,5 @@ public class HybridClock {
* @param requestTime Timestamp from request.
* @return The hybrid timestamp.
*/
- public HybridTimestamp update(HybridTimestamp requestTime) {
- while (true) {
- HybridTimestamp now = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
-
- // Read the latest time after accessing UTC time to reduce contention.
- HybridTimestamp latestTime = this.latestTime;
-
- HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
-
- if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
- return newLatestTime;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return S.toString(HybridClock.class, this);
- }
+ HybridTimestamp update(HybridTimestamp requestTime);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
similarity index 88%
copy from modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 591c96c516..c5b256f75a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -23,9 +23,9 @@ import java.time.Clock;
import org.apache.ignite.internal.tostring.S;
/**
- * A Hybrid Logical Clock.
+ * A Hybrid Logical Clock implementation.
*/
-public class HybridClock {
+public class HybridClockImpl implements HybridClock {
/**
* Var handle for {@link #latestTime}.
*/
@@ -33,19 +33,19 @@ public class HybridClock {
static {
try {
- LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
+ LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", HybridTimestamp.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
/** Latest timestamp. */
- private volatile HybridTimestamp latestTime;
+ protected volatile HybridTimestamp latestTime;
/**
* The constructor which initializes the latest time to current time by system clock.
*/
- public HybridClock() {
+ public HybridClockImpl() {
this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
}
@@ -88,7 +88,9 @@ public class HybridClock {
// Read the latest time after accessing UTC time to reduce contention.
HybridTimestamp latestTime = this.latestTime;
- HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
+ HybridTimestamp maxLatestTime = HybridTimestamp.max(now, requestTime, latestTime);
+
+ HybridTimestamp newLatestTime = maxLatestTime.addTicks(1);
if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
return newLatestTime;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
new file mode 100644
index 0000000000..a976c01e51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracker that stores comparable value internally, this value can grow when {@link #update(Comparable)} method is called. The tracker gives
+ * ability to wait for certain value, see {@link #waitFor(Comparable)}.
+ */
+public class PendingComparableValuesTracker<T extends Comparable<T>> {
+ /** Map of comparable values to corresponding futures. */
+ private final ConcurrentSkipListMap<T, Collection<CompletableFuture<Void>>> valueFutures = new ConcurrentSkipListMap<>();
+
+ /** Current value. */
+ public final AtomicReference<T> current;
+
+ /**
+ * Constructor with initial value.
+ *
+ * @param initialValue Initial value.
+ */
+ public PendingComparableValuesTracker(T initialValue) {
+ current = new AtomicReference<>(initialValue);
+ }
+
+ /**
+ * Updates the internal state, if it is lower than {@code newValue} and completes all futures waiting for {@code newValue}
+ * that had been created for corresponding values that are lower than the given one.
+ *
+ * @param newValue New value.
+ */
+ public void update(T newValue) {
+ for (Map.Entry<T, Collection<CompletableFuture<Void>>> e : valueFutures.entrySet()) {
+ if (newValue.compareTo(e.getKey()) >= 0) {
+ valueFutures.compute(e.getKey(), (k, v) -> {
+ if (v != null) {
+ v.forEach(f -> f.complete(null));
+ }
+
+ return null;
+ });
+ } else {
+ break;
+ }
+ }
+
+ while (true) {
+ T current = this.current.get();
+
+ if (newValue.compareTo(current) > 0) {
+ if (this.current.compareAndSet(current, newValue)) {
+ return;
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Provides the future that is completed when this tracker's internal value reaches given one. If the internal value is greater or
+ * equal then the given one, returns completed future.
+ *
+ * @param valueToWait Value to wait.
+ * @return Future.
+ */
+ public CompletableFuture<Void> waitFor(T valueToWait) {
+ if (current.get().compareTo(valueToWait) >= 0) {
+ return completedFuture(null);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ valueFutures.compute(valueToWait, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentLinkedDeque<>();
+ }
+
+ v.add(future);
+
+ return v;
+ });
+
+ if (current.get().compareTo(valueToWait) >= 0) {
+ future.complete(null);
+
+ valueFutures.compute(valueToWait, (k, v) -> {
+ if (v == null) {
+ return null;
+ } else {
+ v.remove(future);
+ }
+
+ return v;
+ });
+ }
+
+ return future;
+ }
+
+ /**
+ * Returns current internal value.
+ *
+ * @return Current value.
+ */
+ public T current() {
+ return current.get();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index 5442de531b..c5a5633e81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.hlc;
+package org.apache.ignite.internal;
import static org.apache.ignite.internal.hlc.HybridClockTestUtils.mockToEpochMilli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Clock;
import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
@@ -48,7 +51,7 @@ class HybridClockTest {
public void testNow() {
clockMock = mockToEpochMilli(100);
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
assertTimestampEquals(100, new HybridTimestamp(100, 1), clock::now);
@@ -66,7 +69,7 @@ class HybridClockTest {
public void testTick() {
clockMock = mockToEpochMilli(100);
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
assertTimestampEquals(100, new HybridTimestamp(100, 1),
() -> clock.update(new HybridTimestamp(50, 1)));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
similarity index 64%
copy from modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
index 591c96c516..f795d74fd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
@@ -15,17 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.hlc;
+package org.apache.ignite.internal;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
-import java.time.Clock;
-import org.apache.ignite.internal.tostring.S;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
- * A Hybrid Logical Clock.
+ * Test hybrid clock with custom supplier of current time.
*/
-public class HybridClock {
+public class TestHybridClock implements HybridClock {
+ /** Supplier of current time in milliseconds. */
+ private final LongSupplier currentTimeMillisSupplier;
+
+ /** Latest time. */
+ private volatile HybridTimestamp latestTime;
+
/**
* Var handle for {@link #latestTime}.
*/
@@ -33,30 +40,22 @@ public class HybridClock {
static {
try {
- LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
+ LATEST_TIME = MethodHandles.lookup().findVarHandle(TestHybridClock.class, "latestTime", HybridTimestamp.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
- /** Latest timestamp. */
- private volatile HybridTimestamp latestTime;
-
- /**
- * The constructor which initializes the latest time to current time by system clock.
- */
- public HybridClock() {
- this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
+ public TestHybridClock(LongSupplier currentTimeMillisSupplier) {
+ this.currentTimeMillisSupplier = currentTimeMillisSupplier;
+ this.latestTime = new HybridTimestamp(currentTimeMillisSupplier.getAsLong(), 0);
}
- /**
- * Creates a timestamp for new event.
- *
- * @return The hybrid timestamp.
- */
+ /** {@inheritDoc} */
+ @Override
public HybridTimestamp now() {
while (true) {
- long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
+ long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
// Read the latest time after accessing UTC time to reduce contention.
HybridTimestamp latestTime = this.latestTime;
@@ -75,18 +74,11 @@ public class HybridClock {
}
}
- /**
- * Creates a timestamp for a received event.
- *
- * @param requestTime Timestamp from request.
- * @return The hybrid timestamp.
- */
+ /** {@inheritDoc} */
+ @Override
public HybridTimestamp update(HybridTimestamp requestTime) {
while (true) {
- HybridTimestamp now = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
-
- // Read the latest time after accessing UTC time to reduce contention.
- HybridTimestamp latestTime = this.latestTime;
+ HybridTimestamp now = new HybridTimestamp(currentTimeMillisSupplier.getAsLong(), -1);
HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
@@ -95,10 +87,4 @@ public class HybridClock {
}
}
}
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return S.toString(HybridClock.class, this);
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
new file mode 100644
index 0000000000..d61a211569
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link PendingComparableValuesTracker}.
+ */
+public class PendingComparableValuesTrackerTest {
+ @Test
+ public void testSimpleWaitFor() {
+ HybridTimestamp ts = new HybridTimestamp(1, 0);
+
+ PendingComparableValuesTracker<HybridTimestamp> tracker = new PendingComparableValuesTracker<>(ts);
+
+ HybridTimestamp ts1 = new HybridTimestamp(ts.getPhysical() + 1_000_000, 0);
+ HybridTimestamp ts2 = new HybridTimestamp(ts.getPhysical() + 2_000_000, 0);
+ HybridTimestamp ts3 = new HybridTimestamp(ts.getPhysical() + 3_000_000, 0);
+
+ CompletableFuture<Void> f0 = tracker.waitFor(ts1);
+ CompletableFuture<Void> f1 = tracker.waitFor(ts2);
+ CompletableFuture<Void> f2 = tracker.waitFor(ts3);
+
+ assertFalse(f0.isDone());
+ assertFalse(f1.isDone());
+ assertFalse(f2.isDone());
+
+ tracker.update(ts1);
+ assertThat(f0, willCompleteSuccessfully());
+ assertFalse(f1.isDone());
+ assertFalse(f2.isDone());
+
+ tracker.update(ts2);
+ assertThat(f1, willCompleteSuccessfully());
+ assertFalse(f2.isDone());
+
+ tracker.update(ts3);
+ assertThat(f2, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testMultithreadedWaitFor() throws Exception {
+ HybridClock clock = new HybridClockImpl();
+
+ PendingComparableValuesTracker<HybridTimestamp> tracker = new PendingComparableValuesTracker<>(clock.now());
+
+ int threads = Runtime.getRuntime().availableProcessors() * 2;
+
+ Collection<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> allFutures = new ConcurrentLinkedDeque<>();
+
+ int iterations = 10_000;
+
+ runMultiThreaded(
+ () -> {
+ List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> prevFutures = new ArrayList<>();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ for (int i = 0; i < iterations; i++) {
+ HybridTimestamp now = clock.now();
+ tracker.update(now);
+ HybridTimestamp timestampToWait =
+ new HybridTimestamp(now.getPhysical() + 1, now.getLogical() + random.nextInt(1000));
+
+ CompletableFuture<Void> future = tracker.waitFor(timestampToWait);
+
+ IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp> pair = new IgniteBiTuple<>(future, timestampToWait);
+
+ prevFutures.add(pair);
+ allFutures.add(pair);
+
+ if (i % 10 == 0) {
+ for (Iterator<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> it = prevFutures.iterator();
+ it.hasNext();) {
+ IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp> t = it.next();
+
+ if (t.get2().compareTo(now) <= 0) {
+ assertTrue(t.get1().isDone(), "now=" + now + ", ts=" + t.get2() + ", trackerTs=" + tracker.current());
+
+ it.remove();
+ }
+ }
+ }
+ }
+
+ return null;
+ },
+ threads,
+ "trackableHybridClockTest"
+ );
+
+ Thread.sleep(5);
+
+ tracker.update(clock.now());
+
+ List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> uncompleted =
+ allFutures.stream().filter(f -> !f.get1().isDone()).collect(toList());
+
+ assertTrue(uncompleted.isEmpty());
+ }
+}
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index bd68cef47c..7f697ac57f 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -106,6 +106,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- Benchmark dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index 804903d7c7..8ae9812370 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -47,7 +47,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -111,7 +111,7 @@ public class ItLearnersTest extends IgniteAbstractTest {
Path raftDir = workDir.resolve(clusterService.localConfiguration().getName());
- loza = new Loza(clusterService, raftConfiguration, raftDir, new HybridClock());
+ loza = new Loza(clusterService, raftConfiguration, raftDir, new HybridClockImpl());
}
ClusterNode localMember() {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index b4d247f228..90810cbdc0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -126,7 +126,7 @@ public class ItLozaTest {
CompletableFuture<NetworkMessage> exception = CompletableFuture.failedFuture(new IOException());
- loza = new Loza(service, raftConfiguration, dataPath, new HybridClock());
+ loza = new Loza(service, raftConfiguration, dataPath, new HybridClockImpl());
loza.start();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 2ba4b40c49..b155af00b8 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -102,7 +102,7 @@ public class ItRaftGroupServiceTest {
CompletableFuture<RaftGroupService>[] svcFutures = new CompletableFuture[NODES_CNT];
for (int i = 0; i < NODES_CNT; i++) {
- Loza raftServer = new Loza(clusterServices.get(i), raftConfiguration, workDir.resolve("node" + i), new HybridClock());
+ Loza raftServer = new Loza(clusterServices.get(i), raftConfiguration, workDir.resolve("node" + i), new HybridClockImpl());
raftSrvs.add(raftServer);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 55210e4cd3..5df597fb0b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -68,6 +68,7 @@ import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -3779,7 +3780,7 @@ public class ItNodeTest {
for (PeerId peer : peers) {
RaftOptions opts = new RaftOptions();
opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
assertTrue(cluster.start(peer.getEndpoint(), false, 300, false, null, opts, clock));
}
@@ -3816,11 +3817,11 @@ public class ItNodeTest {
if (msg.entriesList() == null && msg.data() == null) {
heartbeatRequest.set(true);
- assertTrue(msg.timestamp() != null);
} else {
appendEntriesRequest.set(true);
- assertTrue(msg.timestamp() == null);
}
+
+ assertTrue(msg.timestamp() != null);
} else if (msgs[0] instanceof AppendEntriesResponseImpl) {
AppendEntriesResponseImpl msg = (AppendEntriesResponseImpl) msgs[0];
if (msg.timestamp() == null) {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 3a16b68619..ede4466f8b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
-import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,28 +42,16 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.ReadCommand;
@@ -76,7 +63,6 @@ import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.server.counter.CounterListener;
@@ -95,12 +81,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
* Jraft server.
*/
@ExtendWith(WorkDirectoryExtension.class)
-class ItJraftCounterServerTest extends RaftServerAbstractTest {
- /**
- * The logger.
- */
- private static final IgniteLogger LOG = Loggers.forClass(ItJraftCounterServerTest.class);
-
+class ItJraftCounterServerTest extends JraftAbstractTest {
/**
* Counter group name 0.
*/
@@ -111,56 +92,20 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
*/
private static final TestReplicationGroupId COUNTER_GROUP_1 = new TestReplicationGroupId("counter1");
- /**
- * The server port offset.
- */
- private static final int PORT = 5003;
-
- /**
- * The client port offset.
- */
- private static final int CLIENT_PORT = 6003;
-
- /**
- * Initial configuration.
- */
- private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
- .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
- .map(Peer::new)
- .collect(Collectors.toUnmodifiableList());
-
/**
* Listener factory.
*/
private Supplier<CounterListener> listenerFactory = CounterListener::new;
- /**
- * Servers list.
- */
- private final List<JraftServerImpl> servers = new ArrayList<>();
-
- /**
- * Clients list.
- */
- private final List<RaftGroupService> clients = new ArrayList<>();
-
- /**
- * Data path.
- */
- @WorkDirectory
- private Path dataPath;
-
- /** Executor for raft group services. */
- private ScheduledExecutorService executor;
-
/**
* Before each.
*/
@BeforeEach
+ @Override
void before() {
LOG.info(">>>>>>>>>>>>>>> Start test method: {}", testInfo.getTestMethod().orElseThrow().getName());
- executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
+ super.before();
}
/**
@@ -171,112 +116,9 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
protected void after() throws Exception {
super.after();
- shutdownCluster();
-
- IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
-
- TestUtils.assertAllJraftThreadsStopped();
-
LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
}
- private void shutdownCluster() throws Exception {
- LOG.info("Start client shutdown");
-
- Iterator<RaftGroupService> iterClients = clients.iterator();
-
- while (iterClients.hasNext()) {
- RaftGroupService client = iterClients.next();
-
- iterClients.remove();
-
- client.shutdown();
- }
-
- clients.clear();
-
- LOG.info("Start server shutdown servers={}", servers.size());
-
- Iterator<JraftServerImpl> iterSrv = servers.iterator();
-
- while (iterSrv.hasNext()) {
- JraftServerImpl server = iterSrv.next();
-
- iterSrv.remove();
-
- Set<ReplicationGroupId> grps = server.startedGroups();
-
- for (ReplicationGroupId grp : grps) {
- server.stopRaftGroup(grp);
- }
-
- server.beforeNodeStop();
-
- server.stop();
- }
-
- servers.clear();
- }
-
- /**
- * Starts server.
- *
- * @param idx The index.
- * @param clo Init closure.
- * @param cons Node options updater.
- * @return Raft server instance.
- */
- private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> cons) {
- var addr = new NetworkAddress(getLocalAddress(), PORT);
-
- ClusterService service = clusterService(PORT + idx, List.of(addr), true);
-
- NodeOptions opts = new NodeOptions();
-
- cons.accept(opts);
-
- JraftServerImpl server = new JraftServerImpl(service, dataPath.resolve("node" + idx), opts) {
- @Override
- public void stop() throws Exception {
- servers.remove(this);
-
- super.stop();
-
- service.stop();
- }
- };
-
- server.start();
-
- clo.accept(server);
-
- servers.add(server);
-
- assertTrue(waitForTopology(service, servers.size(), 15_000));
-
- return server;
- }
-
- /**
- * Starts client.
- *
- * @param groupId Group id.
- * @return The client.
- * @throws Exception If failed.
- */
- private RaftGroupService startClient(TestReplicationGroupId groupId) throws Exception {
- var addr = new NetworkAddress(getLocalAddress(), PORT);
-
- ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true);
-
- RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
- List.of(new Peer(addr)), false, 200, executor).get(3, TimeUnit.SECONDS);
-
- clients.add(client);
-
- return client;
- }
-
/**
* Starts a cluster for the test.
*
@@ -294,8 +136,6 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
startClient(COUNTER_GROUP_1);
}
-
-
/**
* Checks that the number of Disruptor threads does not depend on count started RAFT nodes.
*/
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
new file mode 100644
index 0000000000..80e3377919
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.ReplicationGroupOptions;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration test for checking safe time propagation.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItSafeTimeTest extends JraftAbstractTest {
+ /** Raft group id. */
+ private static final ReplicationGroupId RAFT_GROUP_ID = new TestReplicationGroupId("testGroup");
+
+ /** Nodes count. */
+ private static final int NODES = 3;
+
+ /** Hybrid clocks. */
+ private List<HybridClock> clocks = new ArrayList<>();
+
+ /** Safe tims clocks. */
+ private List<PendingComparableValuesTracker<HybridTimestamp>> safeTimeContainers = new ArrayList<>();
+
+ /** Before each. */
+ @BeforeEach
+ @Override
+ void before() {
+ LOG.info(">>>>>>>>>>>>>>> Start test method: {}", testInfo.getTestMethod().orElseThrow().getName());
+
+ super.before();
+ }
+
+ /** After each. */
+ @AfterEach
+ @Override
+ protected void after() throws Exception {
+ super.after();
+
+ LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
+ }
+
+ /**
+ * Starts a cluster for the test.
+ *
+ * @throws Exception If failed.
+ */
+ private void startCluster() throws Exception {
+ for (int i = 0; i < NODES; i++) {
+ HybridClock clock = new TestHybridClock(() -> 1L);
+ PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
+ clocks.add(clock);
+ safeTimeContainers.add(safeTime);
+
+ startServer(i,
+ raftServer -> {
+ RaftGroupOptions groupOptions = defaults()
+ .replicationGroupOptions(new ReplicationGroupOptions().safeTime(safeTime));
+
+ raftServer.startRaftGroup(RAFT_GROUP_ID, new CounterListener(), INITIAL_CONF, groupOptions);
+ },
+ opts -> {
+ opts.setClock(clock);
+ opts.setSafeTimeTracker(safeTime);
+ }
+ );
+ }
+
+ startClient(RAFT_GROUP_ID);
+ }
+
+ /**
+ * Tests if a raft group become unavailable in case of a critical error.
+ */
+ @Test
+ public void test() throws Exception {
+ startCluster();
+
+ RaftGroupService client1 = clients.get(0);
+
+ client1.refreshLeader().get();
+ Peer leader = client1.leader();
+
+ final int leaderIndex = INITIAL_CONF.indexOf(leader);
+
+ assertTrue(leaderIndex >= 0);
+
+ final long leaderPhysicalTime = 100;
+
+ clocks.get(leaderIndex).update(new HybridTimestamp(leaderPhysicalTime, 0));
+
+ client1.run(new IncrementAndGetCommand(1)).get();
+
+ waitForCondition(() -> {
+ for (int i = 0; i < NODES; i++) {
+ // As current time provider for safe time clocks always returns 1,
+ // the only way for physical component to reach leaderPhysicalTime is safe time propagation mechanism.
+ if (i != leaderIndex && safeTimeContainers.get(i).current().getPhysical() != leaderPhysicalTime) {
+ return false;
+ }
+ }
+
+ return true;
+ }, 2000);
+ }
+}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
new file mode 100644
index 0000000000..f6ca5bbf5f
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract class for raft tests using JRaftServer.
+ */
+public abstract class JraftAbstractTest extends RaftServerAbstractTest {
+ /**
+ * The logger.
+ */
+ protected static final IgniteLogger LOG = Loggers.forClass(ItJraftCounterServerTest.class);
+
+ /**
+ * The server port offset.
+ */
+ protected static final int PORT = 5003;
+
+ /**
+ * The client port offset.
+ */
+ private static final int CLIENT_PORT = 6003;
+
+ /**
+ * Initial configuration.
+ */
+ protected static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
+
+ /**
+ * Servers list.
+ */
+ protected final List<JraftServerImpl> servers = new ArrayList<>();
+
+ /**
+ * Clients list.
+ */
+ protected final List<RaftGroupService> clients = new ArrayList<>();
+
+ /**
+ * Data path.
+ */
+ @WorkDirectory
+ protected Path dataPath;
+
+ /** Executor for raft group services. */
+ private ScheduledExecutorService executor;
+
+ /**
+ * Before each.
+ */
+ @BeforeEach
+ void before() {
+ executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
+ }
+
+ /**
+ * After each.
+ */
+ @AfterEach
+ @Override
+ protected void after() throws Exception {
+ super.after();
+
+ shutdownCluster();
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+
+ TestUtils.assertAllJraftThreadsStopped();
+ }
+
+ protected void shutdownCluster() throws Exception {
+ LOG.info("Start client shutdown");
+
+ Iterator<RaftGroupService> iterClients = clients.iterator();
+
+ while (iterClients.hasNext()) {
+ RaftGroupService client = iterClients.next();
+
+ iterClients.remove();
+
+ client.shutdown();
+ }
+
+ clients.clear();
+
+ LOG.info("Start server shutdown servers={}", servers.size());
+
+ Iterator<JraftServerImpl> iterSrv = servers.iterator();
+
+ while (iterSrv.hasNext()) {
+ JraftServerImpl server = iterSrv.next();
+
+ iterSrv.remove();
+
+ Set<ReplicationGroupId> grps = server.startedGroups();
+
+ for (ReplicationGroupId grp : grps) {
+ server.stopRaftGroup(grp);
+ }
+
+ server.beforeNodeStop();
+
+ server.stop();
+ }
+
+ servers.clear();
+ }
+
+ /**
+ * Starts server.
+ *
+ * @param idx The index.
+ * @param clo Init closure.
+ * @param optionsUpdater Node options updater.
+ * @return Raft server instance.
+ */
+ protected JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> optionsUpdater) {
+ var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+ ClusterService service = clusterService(PORT + idx, List.of(addr), true);
+
+ NodeOptions opts = new NodeOptions();
+
+ optionsUpdater.accept(opts);
+
+ JraftServerImpl server = new JraftServerImpl(service, dataPath.resolve("node" + idx), opts) {
+ @Override
+ public void stop() throws Exception {
+ servers.remove(this);
+
+ super.stop();
+
+ service.stop();
+ }
+ };
+
+ server.start();
+
+ clo.accept(server);
+
+ servers.add(server);
+
+ assertTrue(waitForTopology(service, servers.size(), 15_000));
+
+ return server;
+ }
+
+ /**
+ * Starts client.
+ *
+ * @param groupId Group id.
+ * @return The client.
+ * @throws Exception If failed.
+ */
+ protected RaftGroupService startClient(ReplicationGroupId groupId) throws Exception {
+ var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+ ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true);
+
+ RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
+ List.of(new Peer(addr)), false, 200, executor).get(3, TimeUnit.SECONDS);
+
+ clients.add(client);
+
+ return client;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index a2abb314da..a60b05d246 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
@@ -58,6 +60,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -107,16 +110,37 @@ public class Loza implements IgniteComponent {
* The constructor.
*
* @param clusterNetSvc Cluster network service.
+ * @param raftConfiguration Raft configuration.
* @param dataPath Data path.
* @param clock A hybrid logical clock.
*/
public Loza(ClusterService clusterNetSvc, RaftConfiguration raftConfiguration, Path dataPath, HybridClock clock) {
+ this(clusterNetSvc, raftConfiguration, dataPath, clock, null);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param clusterNetSvc Cluster network service.
+ * @param raftConfiguration Raft configuration.
+ * @param dataPath Data path.
+ * @param clock A hybrid logical clock.
+ * @param safeTimeTracker Safe time tracker.
+ */
+ public Loza(
+ ClusterService clusterNetSvc,
+ RaftConfiguration raftConfiguration,
+ Path dataPath,
+ HybridClock clock,
+ @Nullable PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker
+ ) {
this.clusterNetSvc = clusterNetSvc;
this.raftConfiguration = raftConfiguration;
NodeOptions options = new NodeOptions();
options.setClock(clock);
+ options.setSafeTimeTracker(safeTimeTracker);
this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index d074ae35e8..f9b4154ffe 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -37,6 +37,9 @@ public class RaftGroupOptions {
/** Raft meta storage factory. */
private RaftMetaStorageFactory raftMetaStorageFactory;
+ /** Options that are specific for replication group. */
+ private ReplicationGroupOptions replicationGroupOptions;
+
/**
* Returns default options as defined by classic Raft (so stores are persistent).
*
@@ -126,4 +129,20 @@ public class RaftGroupOptions {
return this;
}
+
+ /**
+ * Replication group options.
+ */
+ public ReplicationGroupOptions replicationGroupOptions() {
+ return replicationGroupOptions;
+ }
+
+ /**
+ * Set the replication group options.
+ */
+ public RaftGroupOptions replicationGroupOptions(ReplicationGroupOptions replicationGroupOptions) {
+ this.replicationGroupOptions = replicationGroupOptions;
+
+ return this;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java
new file mode 100644
index 0000000000..6301ce8711
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Options that are specific for replication group.
+ */
+public class ReplicationGroupOptions {
+ /** Safe time. */
+ private PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+ /**
+ * Safe time.
+ */
+ public PendingComparableValuesTracker<HybridTimestamp> safeTime() {
+ return safeTime;
+ }
+
+ /**
+ * Set the safe time clock.
+ *
+ * @param safeTime Safe time.
+ * @return This, for chaining.
+ */
+ public ReplicationGroupOptions safeTime(PendingComparableValuesTracker<HybridTimestamp> safeTime) {
+ this.safeTime = safeTime;
+
+ return this;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index d3c008a7e2..830f47bd77 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -414,6 +414,10 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setRpcClient(client);
+ if (groupOptions.replicationGroupOptions() != null) {
+ nodeOptions.setSafeTimeTracker(groupOptions.replicationGroupOptions().safeTime());
+ }
+
NetworkAddress addr = service.topologyService().localMember().address();
var peerId = new PeerId(addr.host(), addr.port(), 0, ElectionPriority.DISABLED);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index b95f4ca175..002c7d3caa 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -58,6 +58,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.Utils;
/**
@@ -155,6 +156,7 @@ public class FSMCallerImpl implements FSMCaller {
private NodeMetrics nodeMetrics;
private final CopyOnWriteArrayList<LastAppliedLogIndexListener> lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
private RaftMessagesFactory msgFactory;
+ private SafeTimeCandidateManager safeTimeCandidateManager;
public FSMCallerImpl() {
super();
@@ -186,6 +188,7 @@ public class FSMCallerImpl implements FSMCaller {
}
this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
this.msgFactory = opts.getRaftMessagesFactory();
+ this.safeTimeCandidateManager = opts.getSafeTimeCandidateManager();
LOG.info("Starts FSMCaller successfully.");
return true;
}
@@ -524,6 +527,9 @@ public class FSMCallerImpl implements FSMCaller {
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(lastIndex);
+ if (safeTimeCandidateManager != null) {
+ safeTimeCandidateManager.commitIndex(lastAppliedIndex, lastIndex, lastTerm);
+ }
}
finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 064546cb0f..3d63f790db 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -116,6 +116,7 @@ import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -137,6 +138,8 @@ public class NodeImpl implements Node, RaftServerService {
private volatile HybridClock clock;
+ private volatile SafeTimeCandidateManager safeTimeCandidateManager;
+
/**
* Internal states
*/
@@ -822,6 +825,7 @@ public class NodeImpl implements Node, RaftServerService {
opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
opts.setGroupId(groupId);
+ opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
return this.fsmCaller.init(opts);
}
@@ -857,6 +861,9 @@ public class NodeImpl implements Node, RaftServerService {
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
this.serviceFactory = opts.getServiceFactory();
this.clock = opts.getNodeOptions().getClock();
+ if (opts.getNodeOptions().getSafeTimeTracker() != null) {
+ this.safeTimeCandidateManager = new SafeTimeCandidateManager(opts.getNodeOptions().getSafeTimeTracker());
+ }
// Term is not an option since changing it is very dangerous
final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
final LogId bootstrapId = new LogId(opts.getLastLogIndex(), bootstrapLogTerm);
@@ -955,6 +962,9 @@ public class NodeImpl implements Node, RaftServerService {
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
this.serviceFactory = opts.getServiceFactory();
this.clock = opts.getClock();
+ if (opts.getSafeTimeTracker() != null) {
+ this.safeTimeCandidateManager = new SafeTimeCandidateManager(opts.getSafeTimeTracker());
+ }
this.options = opts;
this.raftOptions = opts.getRaftOptions();
this.metrics = new NodeMetrics(opts.isEnableMetrics());
@@ -2217,6 +2227,10 @@ public class NodeImpl implements Node, RaftServerService {
}
entries.add(logEntry);
}
+
+ if (safeTimeCandidateManager != null) {
+ safeTimeCandidateManager.addSafeTimeCandidate(index, request.term(), request.timestamp());
+ }
}
final FollowerStableClosure closure = new FollowerStableClosure(
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index e029fd25ca..7d2a7402c3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -671,9 +671,7 @@ public class Replicator implements ThreadId.OnError {
private void sendEmptyEntries(final boolean isHeartbeat,
final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
final AppendEntriesRequestBuilder rb = raftOptions.getRaftMessagesFactory().appendEntriesRequest();
- if (isHeartbeat) {
- rb.timestamp(options.getNode().clockNow());
- }
+ rb.timestamp(options.getNode().clockNow());
if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
// id is unlock in installSnapshot
installSnapshot();
@@ -1589,6 +1587,8 @@ public class Replicator implements ThreadId.OnError {
RecycleUtil.recycle(byteBufList);
}
+ rb.timestamp(this.options.getNode().clockNow());
+
final AppendEntriesRequest request = rb.build();
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 53a78d0c95..7f37460c75 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -25,6 +25,7 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.storage.LogManager;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
/**
* FSM caller options.
@@ -41,6 +42,7 @@ public class FSMCallerOptions {
private NodeImpl node;
private RaftMessagesFactory raftMessagesFactory;
private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+ private SafeTimeCandidateManager safeTimeCandidateManager;
public String getGroupId() {
return groupId;
@@ -115,4 +117,12 @@ public class FSMCallerOptions {
public void setRaftMessagesFactory(RaftMessagesFactory raftMessagesFactory) {
this.raftMessagesFactory = raftMessagesFactory;
}
+
+ public SafeTimeCandidateManager getSafeTimeCandidateManager() {
+ return safeTimeCandidateManager;
+ }
+
+ public void setSafeTimeCandidateManager(SafeTimeCandidateManager safeTimeCandidateManager) {
+ this.safeTimeCandidateManager = safeTimeCandidateManager;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 158efeeb95..565f8eab75 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -18,8 +18,11 @@ package org.apache.ignite.raft.jraft.option;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -234,7 +237,10 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
/** A hybrid clock */
- private HybridClock clock = new HybridClock();
+ private HybridClock clock = new HybridClockImpl();
+
+ /** A container for safe time. */
+ private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
/**
* Amount of Disruptors that will handle the RAFT server.
@@ -597,6 +603,14 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
this.clock = clock;
}
+ public PendingComparableValuesTracker<HybridTimestamp> getSafeTimeTracker() {
+ return safeTimeTracker;
+ }
+
+ public void setSafeTimeTracker(PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker) {
+ this.safeTimeTracker = safeTimeTracker;
+ }
+
@Override
public NodeOptions copy() {
final NodeOptions nodeOptions = new NodeOptions();
@@ -633,6 +647,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
nodeOptions.setClock(this.getClock());
+ nodeOptions.setSafeTimeTracker(this.getSafeTimeTracker());
return nodeOptions;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
new file mode 100644
index 0000000000..01c2719339
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * This manager stores safe time candidates coming with appendEntries requests, and applies them after committing corresponding
+ * indexes. Only candidates with correct term can be applied, other ones are discarded.
+ */
+public class SafeTimeCandidateManager {
+ /** Safe time clock. */
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
+
+ /** Candidates map. */
+ private final Map<Long, Map<Long, HybridTimestamp>> safeTimeCandidates = new ConcurrentHashMap<>();
+
+ public SafeTimeCandidateManager(PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker) {
+ this.safeTimeTracker = safeTimeTracker;
+ }
+
+ /**
+ * Add safe time candidate.
+ *
+ * @param index Corresponding log index.
+ * @param term Corresponding term.
+ * @param safeTime Safe time candidate.
+ */
+ public void addSafeTimeCandidate(long index, long term, HybridTimestamp safeTime) {
+ safeTimeCandidates.compute(index, (i, candidates) -> {
+ if (candidates == null) {
+ candidates = new HashMap<>();
+ }
+
+ candidates.put(term, safeTime);
+
+ return candidates;
+ });
+ }
+
+ /**
+ * Called on index commit, applies safe time for corresponding index.
+ *
+ * @param prevIndex Previous applied index.
+ * @param index Index.
+ * @param term Term.
+ */
+ public void commitIndex(long prevIndex, long index, long term) {
+ long currentIndex = prevIndex + 1;
+
+ while (currentIndex <= index) {
+ Map<Long, HybridTimestamp> candidates = safeTimeCandidates.remove(currentIndex);
+
+ if (candidates != null) {
+ HybridTimestamp safeTime = null;
+
+ for (Map.Entry<Long, HybridTimestamp> e : candidates.entrySet()) {
+ if (e.getKey() == term) {
+ safeTime = e.getValue();
+ }
+ }
+
+ assert safeTime != null;
+
+ safeTimeTracker.update(safeTime);
+ }
+
+ currentIndex++;
+ }
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 6cc9758794..4671588ef4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -65,7 +65,7 @@ public class LozaTest extends IgniteAbstractTest {
Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
- Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClock());
+ Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClockImpl());
loza.start();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index dc15eadadd..de8f7666ae 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -18,6 +18,8 @@ package org.apache.ignite.raft.jraft.core;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -42,6 +44,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -73,6 +76,10 @@ public class FSMCallerTest {
private ExecutorService executor;
+ private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+
+ private SafeTimeCandidateManager safeTimeCandidateManager = new SafeTimeCandidateManager(safeTimeTracker);
+
@BeforeEach
public void setup() {
this.fsmCaller = new FSMCallerImpl();
@@ -94,6 +101,7 @@ public class FSMCallerTest {
1024,
() -> new FSMCallerImpl.ApplyTask(),
1));
+ opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
assertTrue(this.fsmCaller.init(opts));
}
@@ -131,6 +139,7 @@ public class FSMCallerTest {
@Test
public void testOnCommitted() throws Exception {
+ safeTimeCandidateManager.addSafeTimeCandidate(11, 1, new HybridTimestamp(1, 1));
final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
log.getId().setIndex(11);
log.getId().setTerm(1);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
new file mode 100644
index 0000000000..4f07642fbb
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test class for {@link SafeTimeCandidateManager}.
+ */
+public class SafeTimeCandidatesManagerTest {
+ @Test
+ public void test() {
+ PendingComparableValuesTracker<HybridTimestamp> safeTimeClock = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+
+ SafeTimeCandidateManager safeTimeCandidateManager = new SafeTimeCandidateManager(safeTimeClock);
+
+ safeTimeCandidateManager.addSafeTimeCandidate(1, 1, new HybridTimestamp(1, 1));
+
+ safeTimeCandidateManager.commitIndex(0, 1, 1);
+ assertEquals(new HybridTimestamp(1, 1), safeTimeClock.current());
+
+ safeTimeCandidateManager.addSafeTimeCandidate(2, 1, new HybridTimestamp(10, 1));
+ safeTimeCandidateManager.addSafeTimeCandidate(2, 2, new HybridTimestamp(100, 1));
+ safeTimeCandidateManager.addSafeTimeCandidate(3, 3, new HybridTimestamp(1000, 1));
+
+ safeTimeCandidateManager.commitIndex(1, 2, 2);
+ assertEquals(new HybridTimestamp(100, 1), safeTimeClock.current());
+
+ safeTimeCandidateManager.commitIndex(2, 3, 3);
+ assertEquals(new HybridTimestamp(1000, 1), safeTimeClock.current());
+ }
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 447cec4c81..a4381bc402 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -59,6 +59,14 @@ public class Replica {
replicaGrpId);
return listener.invoke(request);
+ }
+ /**
+ * Replica group identity, this id is the same as the considered partition's id.
+ *
+ * @return Group id.
+ */
+ public ReplicationGroupId groupId() {
+ return replicaGrpId;
}
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index a15fd2b7cc..6d95d5d136 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,10 +17,15 @@
package org.apache.ignite.internal.replicator;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -33,7 +38,9 @@ import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -52,6 +59,9 @@ import org.jetbrains.annotations.TestOnly;
* This class allow to start/stop/get a replica.
*/
public class ReplicaManager implements IgniteComponent {
+ /** Idle safe time propagation period. */
+ private static final int IDLE_SAFE_TIME_PROPAGATION_PERIOD_SECONDS = 1;
+
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
@@ -76,6 +86,10 @@ public class ReplicaManager implements IgniteComponent {
/** A hybrid logical clock. */
private final HybridClock clock;
+ /** Scheduled executor for idle safe time sync. */
+ private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor =
+ Executors.newScheduledThreadPool(1, new NamedThreadFactory("scheduled-idle-safe-time-sync-thread", LOG));
+
/** Set of message groups to handler as replica requests. */
Set<Class<?>> messageGroupsToHandle;
@@ -168,7 +182,8 @@ public class ReplicaManager implements IgniteComponent {
*/
public Replica startReplica(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener) throws NodeStoppingException {
+ ReplicaListener listener
+ ) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
@@ -187,7 +202,10 @@ public class ReplicaManager implements IgniteComponent {
* @param listener Replica listener.
* @return New replica.
*/
- private Replica startReplicaInternal(ReplicationGroupId replicaGrpId, ReplicaListener listener) {
+ private Replica startReplicaInternal(
+ ReplicationGroupId replicaGrpId,
+ ReplicaListener listener
+ ) {
var replica = new Replica(replicaGrpId, listener);
Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
@@ -233,6 +251,12 @@ public class ReplicaManager implements IgniteComponent {
public void start() {
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler);
messageGroupsToHandle.forEach(mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler));
+ scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(
+ this::idleSafeTimeSync,
+ 0,
+ IDLE_SAFE_TIME_PROPAGATION_PERIOD_SECONDS,
+ TimeUnit.SECONDS
+ );
}
/** {@inheritDoc} */
@@ -244,6 +268,8 @@ public class ReplicaManager implements IgniteComponent {
busyLock.block();
+ shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 10, TimeUnit.SECONDS);
+
assert replicas.isEmpty() : "There are replicas alive [replicas=" + replicas.keySet() + ']';
}
@@ -341,6 +367,19 @@ public class ReplicaManager implements IgniteComponent {
}
}
+ /**
+ * Idle safe time sync for replicas.
+ */
+ private void idleSafeTimeSync() {
+ replicas.values().forEach(r -> {
+ ReplicaSafeTimeSyncRequest req = REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+ .groupId(r.groupId())
+ .build();
+
+ r.processRequest(req);
+ });
+ }
+
/**
* Returns started replication groups.
*
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
new file mode 100644
index 0000000000..9b7d3389a2
--- /dev/null
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.command;
+
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * Write command to synchronize safe time periodically.
+ */
+public class SafeTimeSyncCommand implements WriteCommand {
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index 37485df750..abac579576 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -38,4 +38,7 @@ public class ReplicaMessageGroup {
/** Message type for {@link TimestampAwareReplicaResponse}. */
public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
+
+ /** Message type for {@link ReplicaSafeTimeSyncRequest}. */
+ public static final short SAFE_TIME_SYNC_REQUEST = 6;
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
similarity index 51%
copy from modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
index 37485df750..0cd612f9f3 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
@@ -17,25 +17,11 @@
package org.apache.ignite.internal.replicator.message;
-import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Message group for the replication process.
+ * Request that initiates safe time synchronization.
*/
-@MessageGroup(groupType = 8, groupName = "ReplicaMessages")
-public class ReplicaMessageGroup {
- /** Message type for {@link ErrorReplicaResponse}. */
- public static final short ERROR_REPLICA_RESPONSE = 1;
-
- /** Message type for {@link ReplicaResponse}. */
- public static final short REPLICA_RESPONSE = 2;
-
- /** Message type for {@link TimestampAware}. */
- public static final short TIMESTAMP_AWARE = 3;
-
- /** Message type for {@link ErrorTimestampAwareReplicaResponse}. */
- public static final short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4;
-
- /** Message type for {@link TimestampAwareReplicaResponse}. */
- public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
+@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_REQUEST)
+public interface ReplicaSafeTimeSyncRequest extends ReplicaRequest {
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 822a1604a0..0cf4b76c0e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.configuration.storage.ConfigurationStorageList
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -118,7 +118,7 @@ public class ItDistributedConfigurationPropertiesTest {
new StaticNodeFinder(memberAddrs)
);
- raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+ raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
cmgManager = new ClusterManagementGroupManager(
vaultManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index ce14c25d27..138c9ea6c0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -91,7 +91,7 @@ public class ItDistributedConfigurationStorageTest {
new StaticNodeFinder(List.of(addr))
);
- raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+ raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
cmgManager = new ClusterManagementGroupManager(
vaultManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index fb5f1593aa..4f2b5ca7af 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -462,15 +463,15 @@ public class ItRebalanceDistributedTest {
lockManager = new HeapLockManager();
- raftManager = new Loza(clusterService, raftConfiguration, dir, new HybridClock());
+ raftManager = new Loza(clusterService, raftConfiguration, dir, new HybridClockImpl());
replicaManager = new ReplicaManager(
clusterService,
- new HybridClock(),
+ new HybridClockImpl(),
Set.of(TableMessageGroup.class, TxMessageGroup.class)
);
- HybridClock hybridClock = new HybridClock();
+ HybridClock hybridClock = new HybridClockImpl();
ReplicaService replicaSvc = new ReplicaService(
clusterService.messagingService(),
@@ -557,7 +558,7 @@ public class ItRebalanceDistributedTest {
metaStorageManager,
schemaManager,
view -> new LocalLogStorageFactory(),
- new HybridClock(),
+ new HybridClockImpl(),
new OutgoingSnapshotsManager(clusterService.messagingService())
);
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1fb9fe6695..baaca2d22d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorag
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -229,7 +230,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
nettyBootstrapFactory
);
- HybridClock hybridClock = new HybridClock();
+ HybridClock hybridClock = new HybridClockImpl();
var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index ba9eb80043..000e8d5a39 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -393,6 +393,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
*
* @throws Exception If failed.
*/
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18003")
@Test
public void testCreateDropTable() throws Exception {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index cd600397e8..6c4b0145a1 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -285,7 +286,7 @@ public class IgniteImpl implements Ignite {
nodeCfgMgr.configurationRegistry().getConfiguration(ComputeConfiguration.KEY)
);
- clock = new HybridClock();
+ clock = new HybridClockImpl();
raftMgr = new Loza(
clusterSvc,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 9b3c3f85ac..cfc331940d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.configuration.notifications.ConfigurationStora
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.testframework.InjectRevisionListenerHolder;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
@@ -505,7 +506,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
msm,
schemaManager,
view -> new LocalLogStorageFactory(),
- null,
+ new HybridClockImpl(),
mock(OutgoingSnapshotsManager.class)
);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
index 47d70d6953..1e86f9b66f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -148,7 +149,7 @@ public class TableScanExecutionTest extends AbstractExecutionTest {
PART_CNT,
NetworkAddress::toString,
addr -> Mockito.mock(ClusterNode.class),
- new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClock()),
+ new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()),
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
replicaSvc,
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 599265679f..d55d99fb8f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -42,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
@@ -65,7 +66,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
protected final UUID txId = newTransactionId();
/** Hybrid clock to generate timestamps. */
- protected final HybridClock clock = new HybridClock();
+ protected final HybridClock clock = new HybridClockImpl();
protected final TestKey key = new TestKey(10, "foo");
private final TestValue value = new TestValue(20, "bar");
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index e7f628459b..f36e1bc0b2 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -83,7 +84,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
/** Internal table to test. */
protected InternalTable internalTbl;
- private final HybridClock clock = new HybridClock();
+ private final HybridClock clock = new HybridClockImpl();
/**
* Prepare test environment using DummyInternalTableImpl and Mocked storage.
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 9e201b9958..a16ab166a9 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -71,7 +72,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
new Column[]{new Column("value", NativeTypes.INT64, false)}
);
- private static final HybridClock CLOCK = new HybridClock();
+ private static final HybridClock CLOCK = new HybridClockImpl();
private static final Row ROW_1 = createKeyValueRow(1, 1001);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 80871da334..15433e7254 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -34,7 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class ItInternalTableReadOnlyScanTest extends ItAbstractInternalTableScanTest {
- private static final HybridClock CLOCK = new HybridClock();
+ private static final HybridClock CLOCK = new HybridClockImpl();
/** {@inheritDoc} */
@Override
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d1815cd608..ac4948c8fb 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -106,7 +107,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
@Override
public void beforeFollowerStop(RaftGroupService service) throws Exception {
// TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
- TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+ TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
managers.add(txManager);
@@ -133,7 +134,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
@Override
public void afterFollowerStop(RaftGroupService service) throws Exception {
// TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
- TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+ TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
managers.add(txManager);
@@ -166,7 +167,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
@Override
public void afterSnapshot(RaftGroupService service) throws Exception {
// TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
- TxManager txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+ TxManager txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
managers.add(txManager);
@@ -228,7 +229,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
.map(Map.Entry::getKey)
.findAny()
.orElseGet(() -> {
- TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+ TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
txManager.start(); // Init listener.
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 2e1c83e628..d633b308da 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -43,6 +43,8 @@ import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
@@ -83,6 +85,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -226,7 +229,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
assertTrue(waitForTopology(client, nodes + 1, 1000));
- clientClock = new HybridClock();
+ clientClock = new HybridClockImpl();
log.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']');
@@ -252,11 +255,17 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
for (int i = 0; i < nodes; i++) {
ClusterNode node = cluster.get(i).topologyService().localMember();
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
clocks.put(node, clock);
- var raftSrv = new Loza(cluster.get(i), raftConfiguration, workDir.resolve("node" + i), clock);
+ var raftSrv = new Loza(
+ cluster.get(i),
+ raftConfiguration,
+ workDir.resolve("node" + i),
+ clock,
+ new PendingComparableValuesTracker<>(clock.now())
+ );
raftSrv.start();
@@ -367,6 +376,16 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
null
);
+ Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList = cluster.stream().collect(Collectors.toMap(
+ node -> node.topologyService().localMember(),
+ node -> {
+ TopologyService ts = node.topologyService();
+
+ Function<Peer, Boolean> f = peer -> ts.getByAddress(peer.address()).equals(ts.localMember());
+
+ return f;
+ }
+ ));
Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
@@ -425,6 +444,9 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
).thenAccept(
raftSvc -> {
try {
+ PendingComparableValuesTracker<HybridTimestamp> safeTime =
+ new PendingComparableValuesTracker<>(clocks.get(node).now());
+
replicaManagers.get(node).startReplica(
new TablePartitionId(tblId, partId),
new PartitionReplicaListener(
@@ -437,10 +459,13 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
clocks.get(node),
+ safeTime,
txSateStorage,
topologyServices.get(node),
- placementDriver
- ));
+ placementDriver,
+ isLocalPeerCheckerList.get(node)
+ )
+ );
} catch (NodeStoppingException e) {
fail("Unexpected node stopping", e);
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 5b0cc154f3..f3a4d601cf 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -128,7 +129,7 @@ public class ItColocationTest {
ReplicaService replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
- TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock()) {
+ TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()) {
@Override
public CompletableFuture<Void> finish(
ReplicationGroupId commitPartition,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e368cfde62..222a0896e1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.causality.VersionedValue;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.EventListener;
@@ -89,6 +90,7 @@ import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.ReplicationGroupOptions;
import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -136,6 +138,7 @@ import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteNameUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.RebalanceUtil;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -711,6 +714,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
+ PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture.supplyAsync(() -> getOrCreateMvPartition(internalTbl.storage(), partId), ioExecutor)
.thenComposeAsync(mvPartitionStorage -> {
@@ -760,7 +765,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
internalTbl.storage(),
internalTbl.txStateStorage(),
partitionKey(internalTbl, partId),
- newPartAssignment
+ newPartAssignment,
+ safeTime
);
try {
@@ -830,9 +836,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
new Lazy<>(() -> table.indexStorageAdapters(partId)
.get().get(table.pkId())),
clock,
+ safeTime,
txStatePartitionStorage,
topologyService,
- placementDriver
+ placementDriver,
+ peer -> clusterNodeResolver.apply(peer.address())
+ .equals(topologyService.localMember())
)
);
} catch (NodeStoppingException ex) {
@@ -888,7 +897,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
PartitionKey partitionKey,
- Set<ClusterNode> peers
+ Set<ClusterNode> peers,
+ PendingComparableValuesTracker<HybridTimestamp> safeTime
) {
RaftGroupOptions raftGroupOptions;
@@ -911,6 +921,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
incomingSnapshotsExecutor
));
+ raftGroupOptions.replicationGroupOptions(new ReplicationGroupOptions().safeTime(safeTime));
+
return raftGroupOptions;
}
@@ -1711,6 +1723,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.filter(p -> !assignments.contains(p))
.collect(Collectors.toList());
+ PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
InternalTable internalTable = tbl.internalTable();
try {
@@ -1730,7 +1744,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
internalTable.storage(),
internalTable.txStateStorage(),
partitionKey(internalTable, partId),
- assignments
+ assignments,
+ safeTime
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -1779,9 +1794,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
tbl.indexesLockers(partId),
new Lazy<>(() -> tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
clock,
+ safeTime,
txStatePartitionStorage,
raftMgr.topologyService(),
- placementDriver
+ placementDriver,
+ peer -> clusterNodeResolver.apply(peer.address())
+ .equals(raftMgr.topologyService().localMember())
)
);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9226624e0f..50fa4fba6d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -36,6 +36,7 @@ import java.util.function.Supplier;
import org.apache.ignite.internal.lock.AutoLockup;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
@@ -130,6 +131,8 @@ public class PartitionListener implements RaftGroupListener {
handleFinishTxCommand((FinishTxCommand) command, commandIndex);
} else if (command instanceof TxCleanupCommand) {
handleTxCleanupCommand((TxCleanupCommand) command, commandIndex);
+ } else if (command instanceof SafeTimeSyncCommand) {
+ handleSafeTimeSyncCommand((SafeTimeSyncCommand) command);
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
@@ -272,6 +275,15 @@ public class PartitionListener implements RaftGroupListener {
});
}
+ /**
+ * Handler for the {@link SafeTimeSyncCommand}.
+ *
+ * @param cmd Command.
+ */
+ private void handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd) {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 018a302666..3f974a9781 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
@@ -29,23 +31,27 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
@@ -58,6 +64,7 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
@@ -80,6 +87,7 @@ import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
@@ -87,6 +95,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -135,6 +144,9 @@ public class PartitionReplicaListener implements ReplicaListener {
/** Hybrid clock. */
private final HybridClock hybridClock;
+ /** Safe time. */
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
/** Placement Driver. */
private final PlacementDriver placementDriver;
@@ -146,6 +158,11 @@ public class PartitionReplicaListener implements ReplicaListener {
private final Supplier<Map<UUID, IndexLocker>> indexesLockers;
+ /**
+ * Function for checking that the given peer is local.
+ */
+ private final Function<Peer, Boolean> isLocalPeerChecker;
+
/**
* The constructor.
*
@@ -156,9 +173,11 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param partId Partition id.
* @param tableId Table id.
* @param hybridClock Hybrid clock.
+ * @param safeTime Safe time clock.
* @param txStateStorage Transaction state storage.
* @param topologyService Topology services.
* @param placementDriver Placement driver.
+ * @param isLocalPeerChecker Function for checking that the given peer is local.
*/
public PartitionReplicaListener(
MvPartitionStorage mvDataStorage,
@@ -170,9 +189,11 @@ public class PartitionReplicaListener implements ReplicaListener {
Supplier<Map<UUID, IndexLocker>> indexesLockers,
Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
HybridClock hybridClock,
+ PendingComparableValuesTracker<HybridTimestamp> safeTime,
TxStateStorage txStateStorage,
TopologyService topologyService,
- PlacementDriver placementDriver
+ PlacementDriver placementDriver,
+ Function<Peer, Boolean> isLocalPeerChecker
) {
this.mvDataStorage = mvDataStorage;
this.raftClient = raftClient;
@@ -183,9 +204,11 @@ public class PartitionReplicaListener implements ReplicaListener {
this.indexesLockers = indexesLockers;
this.pkIndexStorage = pkIndexStorage;
this.hybridClock = hybridClock;
+ this.safeTime = safeTime;
this.txStateStorage = txStateStorage;
this.topologyService = topologyService;
this.placementDriver = placementDriver;
+ this.isLocalPeerChecker = isLocalPeerChecker;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -212,7 +235,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return ensureReplicaIsPrimary(request)
- .thenCompose((ignore) -> {
+ .thenCompose((isPrimary) -> {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
return processSingleEntryAction((ReadWriteSingleRowReplicaRequest) request);
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
@@ -224,17 +247,19 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof ReadWriteScanCloseReplicaRequest) {
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
} else if (request instanceof TxFinishReplicaRequest) {
return processTxFinishAction((TxFinishReplicaRequest) request);
} else if (request instanceof TxCleanupReplicaRequest) {
return processTxCleanupAction((TxCleanupReplicaRequest) request);
} else if (request instanceof ReadOnlySingleRowReplicaRequest) {
- return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+ return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary);
} else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
- return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+ return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary);
} else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
- return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request);
+ return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
+ } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+ return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -258,7 +283,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return txStateFut.thenApply(txMeta -> new IgniteBiTuple<>(txMeta, null));
} else {
- return CompletableFuture.completedFuture(
+ return completedFuture(
new IgniteBiTuple<>(null, topologyService.getByAddress(leaderAddress)));
}
}
@@ -299,9 +324,15 @@ public class PartitionReplicaListener implements ReplicaListener {
* Processes retrieve batch for read only transaction.
*
* @param request Read only retrieve batch request.
+ * @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
+ private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(
+ ReadOnlyScanRetrieveBatchReplicaRequest request,
+ Boolean isPrimary
+ ) {
+ requireNonNull(isPrimary);
+
UUID txId = request.transactionId();
int batchCount = request.batchSize();
HybridTimestamp readTimestamp = request.readTimestamp();
@@ -313,24 +344,30 @@ public class PartitionReplicaListener implements ReplicaListener {
@SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
- while (batchRows.size() < batchCount && cursor.hasNext()) {
- BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+ CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
- if (resolvedReadResult != null) {
- batchRows.add(resolvedReadResult);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+ return safeReadFuture.thenApplyAsync(ignored -> {
+ while (batchRows.size() < batchCount && cursor.hasNext()) {
+ BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+
+ if (resolvedReadResult != null) {
+ batchRows.add(resolvedReadResult);
+ }
}
- }
- return CompletableFuture.completedFuture(batchRows);
+ return batchRows;
+ });
}
/**
* Processes single entry request for read only transaction.
*
* @param request Read only single entry request.
+ * @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+ private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
BinaryRow tableRow = request.binaryRow();
HybridTimestamp readTimestamp = request.readTimestamp();
@@ -339,32 +376,52 @@ public class PartitionReplicaListener implements ReplicaListener {
format("Unknown single request [actionType={}]", request.requestType()));
}
- //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+ CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
- return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> CompletableFuture.completedFuture(binaryRow));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+ return safeReadFuture.thenApplyAsync(ignored -> {
+ //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+ return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> binaryRow);
+ });
}
/**
* Processes multiple entries request for read only transaction.
*
* @param request Read only multiple entries request.
+ * @param isPrimary Whether the given replica is primary.
* @return Result future.
*/
- private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+ private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request, Boolean isPrimary) {
if (request.requestType() != RequestType.RO_GET_ALL) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]", request.requestType()));
}
- ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
+ CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
- for (BinaryRow searchRow : request.binaryRows()) {
- BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+ return safeReadFuture.thenApplyAsync(ignored -> {
+ ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
- result.add(row);
- }
+ for (BinaryRow searchRow : request.binaryRows()) {
+ BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+
+ result.add(row);
+ }
+
+ return result;
+ });
+ }
- return CompletableFuture.completedFuture(result);
+ /**
+ * Handler to process {@link ReplicaSafeTimeSyncRequest}.
+ *
+ * @param request Request.
+ * @return Future.
+ */
+ private CompletionStage<Object> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+ return raftClient.run(new SafeTimeSyncCommand());
}
/**
@@ -452,7 +509,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
- return CompletableFuture.completedFuture(batchRows);
+ return completedFuture(batchRows);
});
}
@@ -674,7 +731,7 @@ public class PartitionReplicaListener implements ReplicaListener {
for (BinaryRow searchRow : request.binaryRows()) {
rowFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForGet(rowId, txId)
@@ -690,7 +747,7 @@ public class PartitionReplicaListener implements ReplicaListener {
result.add(rowFuts[idx].join());
}
- return CompletableFuture.completedFuture(result);
+ return completedFuture(result);
});
}
case RW_DELETE_ALL: {
@@ -701,7 +758,7 @@ public class PartitionReplicaListener implements ReplicaListener {
for (BinaryRow searchRow : request.binaryRows()) {
rowIdLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForDelete(searchRow, rowId, txId);
@@ -725,7 +782,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
if (rowIdsToDelete.isEmpty()) {
- return CompletableFuture.completedFuture(result);
+ return completedFuture(result);
}
return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId))
@@ -740,7 +797,7 @@ public class PartitionReplicaListener implements ReplicaListener {
for (BinaryRow searchRow : request.binaryRows()) {
deleteExactLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId);
@@ -763,7 +820,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
- CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? CompletableFuture.completedFuture(null)
+ CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null)
: applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
return raftFut.thenApply(ignored -> result);
@@ -776,7 +833,7 @@ public class PartitionReplicaListener implements ReplicaListener {
for (BinaryRow searchRow : request.binaryRows()) {
pkReadLockFuts[i++] = resolveRowByPk(searchRow, txId,
- (rowId, row) -> CompletableFuture.completedFuture(rowId));
+ (rowId, row) -> completedFuture(rowId));
}
return allOf(pkReadLockFuts).thenCompose(ignore -> {
@@ -800,7 +857,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
if (rowsToInsert.isEmpty()) {
- return CompletableFuture.completedFuture(result);
+ return completedFuture(result);
}
CompletableFuture<RowId>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
@@ -846,7 +903,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
if (rowsToUpdate.isEmpty()) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId))
@@ -902,7 +959,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_GET: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForGet(rowId, txId)
@@ -912,7 +969,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_DELETE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return takeLocksForDelete(searchRow, rowId, txId)
@@ -923,7 +980,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_GET_AND_DELETE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForDelete(searchRow, rowId, txId)
@@ -934,13 +991,13 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_DELETE_EXACT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, txId))
@@ -951,7 +1008,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_INSERT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId != null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
RowId rowId0 = new RowId(partId);
@@ -997,7 +1054,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_GET_AND_REPLACE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
return takeLocksForUpdate(searchRow, rowId, txId)
@@ -1009,7 +1066,7 @@ public class PartitionReplicaListener implements ReplicaListener {
case RW_REPLACE_IF_EXIST: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return takeLocksForUpdate(searchRow, rowId, txId)
@@ -1055,7 +1112,7 @@ public class PartitionReplicaListener implements ReplicaListener {
Collection<IndexLocker> indexes = indexesLockers.get().values();
if (nullOrEmpty(indexes)) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
@@ -1072,7 +1129,7 @@ public class PartitionReplicaListener implements ReplicaListener {
Collection<IndexLocker> indexes = indexesLockers.get().values();
if (nullOrEmpty(indexes)) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
@@ -1101,7 +1158,7 @@ public class PartitionReplicaListener implements ReplicaListener {
.thenApply(exclusiveRowLock -> rowId);
}
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
});
}
@@ -1148,13 +1205,13 @@ public class PartitionReplicaListener implements ReplicaListener {
if (request.requestType() == RequestType.RW_REPLACE) {
return resolveRowByPk(newRow, txId, (rowId, row) -> {
if (rowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return takeLocksForReplace(expectedRow, row, newRow, rowId, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
- return CompletableFuture.completedFuture(false);
+ return completedFuture(false);
}
return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, newRow, txId))
@@ -1184,7 +1241,7 @@ public class PartitionReplicaListener implements ReplicaListener {
.thenApply(rowLock -> rowId);
}
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
});
}
@@ -1192,9 +1249,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Ensure that the primary replica was not changed.
*
* @param request Replica request.
- * @return Future.
+ * @return Future. The result is not null only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary.
*/
- private CompletableFuture<Void> ensureReplicaIsPrimary(ReplicaRequest request) {
+ private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest request) {
Long expectedTerm;
if (request instanceof ReadWriteReplicaRequest) {
@@ -1219,14 +1276,16 @@ public class PartitionReplicaListener implements ReplicaListener {
Long currentTerm = replicaAndTerm.get2();
if (expectedTerm.equals(currentTerm)) {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
} else {
return CompletableFuture.failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
}
}
);
+ } else if (request instanceof ReadOnlyReplicaRequest) {
+ return raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> isLocalPeerChecker.apply(replicaAndTerm.get1()));
} else {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 92ea9c80bf..0c6a57dce7 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -77,7 +77,7 @@ public class TxLocalTest extends TxAbstractTest {
}
).when(replicaSvc).invoke(any(), any());
- txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClock());
+ txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClockImpl());
igniteTransactions = new IgniteTransactionsImpl(txManager);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index cde26f39f1..e0d6484963 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.configuration.notifications.ConfigurationStora
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.testframework.InjectRevisionListenerHolder;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -770,7 +771,7 @@ public class TableManagerTest extends IgniteAbstractTest {
msm,
sm = new SchemaManager(revisionUpdater, tblsCfg, msm),
budgetView -> new LocalLogStorageFactory(),
- null,
+ new HybridClockImpl(),
new OutgoingSnapshotsManager(messagingService)
);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index f5435165e7..8d818f7ac6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -144,7 +145,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testTxCleanupCommand() throws Exception {
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
TxCleanupCommand cmd = new TxCleanupCommand(UUID.randomUUID(), true, clock.now());
@@ -157,7 +158,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testFinishTxCommand() throws Exception {
- HybridClock clock = new HybridClock();
+ HybridClock clock = new HybridClockImpl();
ArrayList<ReplicationGroupId> grps = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index d91f3c34e7..9c0bb68cfd 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -40,6 +40,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -94,7 +95,7 @@ public class PartitionCommandListenerTest {
);
/** Hybrid clock. */
- private static final HybridClock CLOCK = new HybridClock();
+ private static final HybridClock CLOCK = new HybridClockImpl();
/** Table command listener. */
private PartitionListener commandListener;
@@ -131,7 +132,7 @@ public class PartitionCommandListenerTest {
commandListener = new PartitionListener(
new TestPartitionDataStorage(mvPartitionStorage),
new TestConcurrentHashMapTxStateStorage(),
- new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock()),
+ new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
() -> Map.of(pkStorage.id(), pkStorage)
);
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 7c82c79035..4d75465b2a 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -105,7 +106,7 @@ public class IncomingSnapshotCopierTest {
new Column[]{new Column("value", NativeTypes.stringOf(256), false)}
);
- private static final HybridClock HYBRID_CLOCK = new HybridClock();
+ private static final HybridClock HYBRID_CLOCK = new HybridClockImpl();
private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
index e5af7246d5..36e67373cb 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
@@ -74,7 +75,7 @@ class OutgoingSnapshotTest {
private RowId rowIdOutOfOrder;
- private final HybridClock clock = new HybridClock();
+ private final HybridClock clock = new HybridClockImpl();
private final UUID transactionId = UUID.randomUUID();
private final UUID commitTableId = UUID.randomUUID();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 37e9dbfb39..b3b4dcd8ca 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lock.AutoLockup;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -142,7 +142,7 @@ class SnapshotAwarePartitionDataStorageTest {
@Test
void delegatesCommitWrite() {
- HybridTimestamp commitTs = new HybridClock().now();
+ HybridTimestamp commitTs = new HybridClockImpl().now();
testedStorage.commitWrite(rowId, commitTs);
@@ -268,7 +268,7 @@ class SnapshotAwarePartitionDataStorageTest {
COMMIT_WRITE {
@Override
void executeOn(SnapshotAwarePartitionDataStorage storage, RowId rowId) {
- storage.commitWrite(rowId, new HybridClock().now());
+ storage.commitWrite(rowId, new HybridClockImpl().now());
}
};
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index b3f59b0f1c..facbd001da 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryConverter;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.TopologyService;
@@ -91,7 +93,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
private static final UUID HASH_INDEX_ID = new UUID(0L, 2L);
private static final UUID SORTED_INDEX_ID = new UUID(0L, 3L);
private static final UUID TRANSACTION_ID = Timestamp.nextVersion().toUuid();
- private static final HybridClock CLOCK = new HybridClock();
+ private static final HybridClock CLOCK = new HybridClockImpl();
private static final LockManager LOCK_MANAGER = new HeapLockManager();
private static final TablePartitionId PARTITION_ID = new TablePartitionId(TABLE_ID, PART_ID);
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
@@ -170,9 +172,11 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
),
pkStorage,
CLOCK,
+ new PendingComparableValuesTracker<>(CLOCK.now()),
new TestConcurrentHashMapTxStateStorage(),
mock(TopologyService.class),
- mock(PlacementDriver.class)
+ mock(PlacementDriver.class),
+ peer -> true
);
kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index f8623eb19c..f2de21750d 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -101,7 +103,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
/** Hybrid clock. */
- private static final HybridClock clock = new HybridClock();
+ private static final HybridClock clock = new HybridClockImpl();
/** The storage stores transaction states. */
private static final TestConcurrentHashMapTxStateStorage txStateStorage = new TestConcurrentHashMapTxStateStorage();
@@ -186,6 +188,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
return CompletableFuture.completedFuture(txMeta);
});
+ PendingComparableValuesTracker safeTimeClock = mock(PendingComparableValuesTracker.class);
+ when(safeTimeClock.waitFor(any())).thenReturn(CompletableFuture.completedFuture(null));
+
UUID indexId = UUID.randomUUID();
BinaryTupleSchema pkSchema = BinaryTupleSchema.create(new Element[]{
@@ -214,9 +219,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
clock,
+ safeTimeClock,
txStateStorage,
topologySrv,
- placementDriver
+ placementDriver,
+ peer -> true
);
marshallerFactory = new ReflectionMarshallerFactory();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 04857460ff..07fd8a0c35 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -32,6 +32,7 @@ import java.util.function.Function;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -140,11 +142,11 @@ public class DummyInternalTableImpl extends InternalTableImpl {
1,
NetworkAddress::toString,
addr -> Mockito.mock(ClusterNode.class),
- txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClock()) : txManager,
+ txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()) : txManager,
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
replicaSvc,
- new HybridClock()
+ new HybridClockImpl()
);
RaftGroupService svc = partitionMap.get(0);
@@ -228,6 +230,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2tuple);
+ HybridClock clock = new HybridClockImpl();
+
replicaListener = new PartitionReplicaListener(
mvPartStorage,
partitionMap.get(0),
@@ -237,10 +241,12 @@ public class DummyInternalTableImpl extends InternalTableImpl {
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
- new HybridClock(),
+ clock,
+ new PendingComparableValuesTracker<>(clock.now()),
+ null,
null,
null,
- null
+ peer -> true
);
partitionListener = new PartitionListener(
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 5a6ffa759f..54738fb3e8 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import java.util.Objects;
import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -67,7 +67,7 @@ public class TxManagerTest extends IgniteAbstractTest {
replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
- txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock());
+ txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
}
@Test