You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/10/31 11:42:29 UTC
[ignite-3] branch main updated: IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ffd2cd9c18 IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)
ffd2cd9c18 is described below
commit ffd2cd9c18888181973d3e36bc9d6eb42b864485
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Mon Oct 31 14:42:23 2022 +0300
IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)
---
.../internal/rocksdb/flush/RocksDbFlusher.java | 2 +-
.../app/ItIgniteInMemoryNodeRestartTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 43 ++++++++++++++++------
.../internal/storage/MvPartitionStorage.java | 11 ------
.../storage/impl/TestMvPartitionStorage.java | 20 ----------
.../mv/AbstractPageMemoryMvPartitionStorage.java | 6 ---
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 32 ----------------
.../internal/table/distributed/TableManager.java | 1 -
.../replicator/PartitionReplicaListener.java | 2 +-
9 files changed, 36 insertions(+), 84 deletions(-)
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 77d69a2449..c139fde2f4 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -99,7 +99,7 @@ public class RocksDbFlusher {
* @param delaySupplier Supplier of delay values to batch independent flush requests. When {@link #awaitFlush(boolean)} is called with
* {@code true} parameter, the flusher waits given number of milliseconds (using {@code scheduledPool}) and then executes flush
* only if there were no other {@code awaitFlush(true)} calls. Otherwise, it does nothing after the timeout. This guarantees that
- * either the last one wins, or automatic flush wins if there's an enlless stream of {@code awaitFlush(true)} calls with very small
+ * either the last one wins, or automatic flush wins if there's an endless stream of {@code awaitFlush(true)} calls with very small
* time-intervals between them. Such behavior allows to save on unnecessary flushes when multiple await flush calls appear at
* roughly the same time from different threads. For example, several partitions might be flushed at the same time, because they
* started at the same time and their flush frequency is also the same.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 7363c5428a..4d39de926c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -59,7 +59,6 @@ import org.junit.jupiter.api.TestInfo;
/**
* These tests check in-memory node restart scenarios.
*/
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-17302")
public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
/** Default node port. */
private static final int DEFAULT_NODE_PORT = 3344;
@@ -179,6 +178,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
* Restarts an in-memory node that is not a leader of the table's partition.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage leader.
IgniteImpl ignite = startNode(testInfo, 0);
@@ -269,6 +269,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
* Restarts all the nodes with the partition.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage leader.
IgniteImpl ignite0 = startNode(testInfo, 0);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index baaca2d22d..0959648de1 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -74,7 +76,9 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.DataStorageManager;
@@ -82,12 +86,14 @@ import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
@@ -117,7 +123,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
* These tests check node restart scenarios.
*/
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0")
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-17302")
@ExtendWith(ConfigurationExtension.class)
public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
/** Default node port. */
@@ -219,6 +224,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ ReplicaMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
@@ -234,7 +240,17 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock);
- var txManager = new TxManagerImpl(null, new HeapLockManager(), hybridClock);
+ ReplicaManager replicaMgr = new ReplicaManager(
+ clusterSvc,
+ hybridClock,
+ Set.of(TableMessageGroup.class, TxMessageGroup.class)
+ );
+
+ var replicaService = new ReplicaService(clusterSvc.messagingService(), hybridClock);
+
+ var lockManager = new HeapLockManager();
+
+ var txManager = new TxManagerImpl(replicaService, lockManager, hybridClock);
var cmgManager = new ClusterManagementGroupManager(
vault,
@@ -282,18 +298,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
- ReplicaService replicaSvc = new ReplicaService(
- clusterSvc.messagingService(),
- null);
-
TableManager tableManager = new TableManager(
name,
registry,
tblCfg,
raftMgr,
- null,
- null,
- replicaSvc,
+ replicaMgr,
+ lockManager,
+ replicaService,
mock(BaselineManager.class),
clusterSvc.topologyService(),
txManager,
@@ -302,10 +314,12 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
metaStorageMgr,
schemaManager,
view -> new LocalLogStorageFactory(),
- null,
+ hybridClock,
new OutgoingSnapshotsManager(clusterSvc.messagingService())
);
+ var indexManager = new IndexManager(tblCfg, schemaManager, tableManager);
+
// Preparing the result map.
partialNode.add(vault);
@@ -335,12 +349,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
clusterSvc,
raftMgr,
cmgManager,
+ replicaMgr,
txManager,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
schemaManager,
- tableManager
+ tableManager,
+ indexManager
);
for (IgniteComponent component : otherComponents) {
@@ -668,6 +684,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* Restarts the node which stores some data.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
public void nodeWithDataTest(TestInfo testInfo) {
Ignite ignite = startNode(testInfo, 0);
@@ -686,6 +703,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* @param testInfo Test information object.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
public void testTwoNodesRestartDirect(TestInfo testInfo) {
twoNodesRestart(testInfo, true);
}
@@ -777,6 +795,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* @param testInfo Test info.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17976")
public void testOneNodeRestartWithGap(TestInfo testInfo) throws NodeStoppingException {
Ignite ignite = startNode(testInfo, 0);
@@ -812,6 +831,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* @param testInfo Test info.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986 ")
public void testRecoveryOnOneNode(TestInfo testInfo) throws NodeStoppingException {
Ignite ignite = startNode(testInfo, 0);
@@ -838,6 +858,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* @param testInfo Test info.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
public void testRestartDiffConfig(TestInfo testInfo) throws NodeStoppingException {
Ignite ignite0 = startNode(testInfo, 0);
Ignite ignite1 = startNode(testInfo, 1);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 52322783ae..cd0dac8c5a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.util.Cursor;
@@ -198,14 +197,4 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
@Deprecated
long rowsCount() throws StorageException;
-
- /**
- * Iterates over all versions of all entries, except for tombstones.
- *
- * @param consumer Closure to process entries.
- * @deprecated This method was born out of desperation and isn't well-designed. Implementation is not polished either. Currently, it's
- * only usage is to work-around in-memory PK index rebuild on node restart, which shouldn't even exist in the first place.
- */
- @Deprecated
- void forEach(BiConsumer<RowId, BinaryRow> consumer);
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 9adf2b522b..51600ef38f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -18,14 +18,12 @@
package org.apache.ignite.internal.storage.impl;
import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -386,24 +384,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
return map.size();
}
- /** {@inheritDoc} */
- @Override
- public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
- for (Entry<RowId, VersionChain> entry : map.entrySet()) {
- RowId rowId = entry.getKey();
-
- VersionChain versionChain = entry.getValue();
-
- for (VersionChain cur = versionChain; cur != null; cur = cur.next) {
- if (cur.row == null) {
- continue;
- }
-
- consumer.accept(rowId, cur.row);
- }
- }
- }
-
/** {@inheritDoc} */
@Override
public void close() throws Exception {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index d05f62ed46..4b7a1b66c4 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -27,7 +27,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.configuration.NamedListView;
@@ -698,11 +697,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- @Override
- public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
- // No-op. Nothing to recover for a volatile storage. See usages and a comment about PK index rebuild.
- }
-
@Override
public void close() {
versionChainTree.close();
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 06862c8091..238fd98802 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -34,7 +34,6 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -817,37 +816,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- /** {@inheritDoc} */
- @Override
- public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
- try (
- var upperBound = new Slice(partitionEndPrefix());
- var options = new ReadOptions().setIterateUpperBound(upperBound);
- RocksIterator it = db.newIterator(cf, options)
- ) {
- it.seek(partitionStartPrefix());
-
- while (it.isValid()) {
- byte[] keyBytes = it.key();
- byte[] valueBytes = it.value();
-
- boolean valueHasTxId = keyBytes.length == ROW_PREFIX_SIZE;
-
- if (!isTombstone(valueBytes, valueHasTxId)) {
- ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER);
-
- RowId rowId = getRowId(keyBuf);
-
- BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
-
- consumer.accept(rowId, binaryRow);
- }
-
- it.next();
- }
- }
- }
-
/**
* Deletes partition data from the storage.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3d745be6ea..a07aef9ebe 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -925,7 +925,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
//TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17814
raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory(
raftMgr.topologyService(),
- //TODO IGNITE-17302 Use miniumum from mv storage and tx state storage.
outgoingSnapshotsManager,
new PartitionAccessImpl(partitionKey, mvTableStorage, txStateTableStorage),
peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 4953287a6a..ccd7794b97 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -625,7 +625,7 @@ public class PartitionReplicaListener implements ReplicaListener {
txTimestampUpdateMap.put(txId, fut);
- HybridTimestamp commitTimestamp = hybridClock.now();
+ HybridTimestamp commitTimestamp = commit ? hybridClock.now() : null;
CompletableFuture<Object> changeStateFuture = raftClient.run(
new FinishTxCommand(