You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/10/31 11:42:29 UTC

[ignite-3] branch main updated: IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ffd2cd9c18 IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)
ffd2cd9c18 is described below

commit ffd2cd9c18888181973d3e36bc9d6eb42b864485
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Mon Oct 31 14:42:23 2022 +0300

    IGNITE-17975 ItIgniteNodeRestartTest and ItIgniteInMemoryNodeRestartTest unmuted (#1250)
---
 .../internal/rocksdb/flush/RocksDbFlusher.java     |  2 +-
 .../app/ItIgniteInMemoryNodeRestartTest.java       |  3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        | 43 ++++++++++++++++------
 .../internal/storage/MvPartitionStorage.java       | 11 ------
 .../storage/impl/TestMvPartitionStorage.java       | 20 ----------
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  6 ---
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 32 ----------------
 .../internal/table/distributed/TableManager.java   |  1 -
 .../replicator/PartitionReplicaListener.java       |  2 +-
 9 files changed, 36 insertions(+), 84 deletions(-)

diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 77d69a2449..c139fde2f4 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -99,7 +99,7 @@ public class RocksDbFlusher {
      * @param delaySupplier Supplier of delay values to batch independent flush requests. When {@link #awaitFlush(boolean)} is called with
      *      {@code true} parameter, the flusher waits given number of milliseconds (using {@code scheduledPool}) and then executes flush
      *      only if there were no other {@code awaitFlush(true)} calls. Otherwise, it does nothing after the timeout. This guarantees that
-     *      either the last one wins, or automatic flush wins if there's an enlless stream of {@code awaitFlush(true)} calls with very small
+     *      either the last one wins, or automatic flush wins if there's an endless stream of {@code awaitFlush(true)} calls with very small
      *      time-intervals between them. Such behavior allows to save on unnecessary flushes when multiple await flush calls appear at
      *      roughly the same time from different threads. For example, several partitions might be flushed at the same time, because they
      *      started at the same time and their flush frequency is also the same.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 7363c5428a..4d39de926c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -59,7 +59,6 @@ import org.junit.jupiter.api.TestInfo;
 /**
  * These tests check in-memory node restart scenarios.
  */
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-17302")
 public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
     /** Default node port. */
     private static final int DEFAULT_NODE_PORT = 3344;
@@ -179,6 +178,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
      * Restarts an in-memory node that is not a leader of the table's partition.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
     public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage leader.
         IgniteImpl ignite = startNode(testInfo, 0);
@@ -269,6 +269,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
      * Restarts all the nodes with the partition.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
     public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage leader.
         IgniteImpl ignite0 = startNode(testInfo, 0);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index baaca2d22d..0959648de1 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -74,7 +76,9 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
 import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.DataStorageManager;
@@ -82,12 +86,14 @@ import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -117,7 +123,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
  * These tests check node restart scenarios.
  */
 @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0")
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-17302")
 @ExtendWith(ConfigurationExtension.class)
 public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
     /** Default node port. */
@@ -219,6 +224,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
         RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+        ReplicaMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
 
         var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
 
@@ -234,7 +240,17 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
 
         var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock);
 
-        var txManager = new TxManagerImpl(null, new HeapLockManager(), hybridClock);
+        ReplicaManager replicaMgr = new ReplicaManager(
+                clusterSvc,
+                hybridClock,
+                Set.of(TableMessageGroup.class, TxMessageGroup.class)
+        );
+
+        var replicaService = new ReplicaService(clusterSvc.messagingService(), hybridClock);
+
+        var lockManager = new HeapLockManager();
+
+        var txManager = new TxManagerImpl(replicaService, lockManager, hybridClock);
 
         var cmgManager = new ClusterManagementGroupManager(
                 vault,
@@ -282,18 +298,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
 
         SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
 
-        ReplicaService replicaSvc = new ReplicaService(
-                clusterSvc.messagingService(),
-                null);
-
         TableManager tableManager = new TableManager(
                 name,
                 registry,
                 tblCfg,
                 raftMgr,
-                null,
-                null,
-                replicaSvc,
+                replicaMgr,
+                lockManager,
+                replicaService,
                 mock(BaselineManager.class),
                 clusterSvc.topologyService(),
                 txManager,
@@ -302,10 +314,12 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 metaStorageMgr,
                 schemaManager,
                 view -> new LocalLogStorageFactory(),
-                null,
+                hybridClock,
                 new OutgoingSnapshotsManager(clusterSvc.messagingService())
         );
 
+        var indexManager = new IndexManager(tblCfg, schemaManager, tableManager);
+
         // Preparing the result map.
 
         partialNode.add(vault);
@@ -335,12 +349,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 clusterSvc,
                 raftMgr,
                 cmgManager,
+                replicaMgr,
                 txManager,
                 metaStorageMgr,
                 clusterCfgMgr,
                 dataStorageManager,
                 schemaManager,
-                tableManager
+                tableManager,
+                indexManager
         );
 
         for (IgniteComponent component : otherComponents) {
@@ -668,6 +684,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * Restarts the node which stores some data.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
     public void nodeWithDataTest(TestInfo testInfo) {
         Ignite ignite = startNode(testInfo, 0);
 
@@ -686,6 +703,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * @param testInfo Test information object.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
     public void testTwoNodesRestartDirect(TestInfo testInfo) {
         twoNodesRestart(testInfo, true);
     }
@@ -777,6 +795,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * @param testInfo Test info.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17976")
     public void testOneNodeRestartWithGap(TestInfo testInfo) throws NodeStoppingException {
         Ignite ignite = startNode(testInfo, 0);
 
@@ -812,6 +831,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * @param testInfo Test info.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986 ")
     public void testRecoveryOnOneNode(TestInfo testInfo) throws NodeStoppingException {
         Ignite ignite = startNode(testInfo, 0);
 
@@ -838,6 +858,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * @param testInfo Test info.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17986")
     public void testRestartDiffConfig(TestInfo testInfo) throws NodeStoppingException {
         Ignite ignite0 = startNode(testInfo, 0);
         Ignite ignite1 = startNode(testInfo, 1);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 52322783ae..cd0dac8c5a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.util.Cursor;
@@ -198,14 +197,4 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     @Deprecated
     long rowsCount() throws StorageException;
-
-    /**
-     * Iterates over all versions of all entries, except for tombstones.
-     *
-     * @param consumer Closure to process entries.
-     * @deprecated This method was born out of desperation and isn't well-designed. Implementation is not polished either. Currently, it's
-     *      only usage is to work-around in-memory PK index rebuild on node restart, which shouldn't even exist in the first place.
-     */
-    @Deprecated
-    void forEach(BiConsumer<RowId, BinaryRow> consumer);
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 9adf2b522b..51600ef38f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -18,14 +18,12 @@
 package org.apache.ignite.internal.storage.impl;
 
 import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -386,24 +384,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         return map.size();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
-        for (Entry<RowId, VersionChain> entry : map.entrySet()) {
-            RowId rowId = entry.getKey();
-
-            VersionChain versionChain = entry.getValue();
-
-            for (VersionChain cur = versionChain; cur != null; cur = cur.next) {
-                if (cur.row == null) {
-                    continue;
-                }
-
-                consumer.accept(rowId, cur.row);
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override
     public void close() throws Exception {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index d05f62ed46..4b7a1b66c4 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -27,7 +27,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.NamedListView;
@@ -698,11 +697,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    @Override
-    public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
-        // No-op. Nothing to recover for a volatile storage. See usages and a comment about PK index rebuild.
-    }
-
     @Override
     public void close() {
         versionChainTree.close();
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 06862c8091..238fd98802 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -34,7 +34,6 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -817,37 +816,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void forEach(BiConsumer<RowId, BinaryRow> consumer) {
-        try (
-                var upperBound = new Slice(partitionEndPrefix());
-                var options = new ReadOptions().setIterateUpperBound(upperBound);
-                RocksIterator it = db.newIterator(cf, options)
-        ) {
-            it.seek(partitionStartPrefix());
-
-            while (it.isValid()) {
-                byte[] keyBytes = it.key();
-                byte[] valueBytes = it.value();
-
-                boolean valueHasTxId = keyBytes.length == ROW_PREFIX_SIZE;
-
-                if (!isTombstone(valueBytes, valueHasTxId)) {
-                    ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER);
-
-                    RowId rowId = getRowId(keyBuf);
-
-                    BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
-
-                    consumer.accept(rowId, binaryRow);
-                }
-
-                it.next();
-            }
-        }
-    }
-
     /**
      * Deletes partition data from the storage.
      */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3d745be6ea..a07aef9ebe 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -925,7 +925,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         //TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17814
         raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory(
                 raftMgr.topologyService(),
-                //TODO IGNITE-17302 Use miniumum from mv storage and tx state storage.
                 outgoingSnapshotsManager,
                 new PartitionAccessImpl(partitionKey, mvTableStorage, txStateTableStorage),
                 peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 4953287a6a..ccd7794b97 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -625,7 +625,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         txTimestampUpdateMap.put(txId, fut);
 
-        HybridTimestamp commitTimestamp = hybridClock.now();
+        HybridTimestamp commitTimestamp =  commit ? hybridClock.now() : null;
 
         CompletableFuture<Object> changeStateFuture = raftClient.run(
                 new FinishTxCommand(