You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/07/14 06:31:14 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #937: IGNITE-16907 Init RAFT group with applied index that corresponds to persisted storage data.

tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r920785270


##########
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java:
##########
@@ -28,6 +28,14 @@
  * @see RaftGroupListener
  */
 public interface CommandClosure<R extends Command> {
+    /**
+     * Corresponding log index of the command. Present for write commands only.
+     * Returns {@code 0} for read commands.
+     */
+    default long index() {

Review Comment:
   What about the name **appliedIndex**?



##########
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java:
##########
@@ -63,7 +63,7 @@ public FakeInternalTable(String tableName, UUID tableId) {
 
     /** {@inheritDoc} */
     @Override
-    public @NotNull TableStorage storage() {
+    public @NotNull MvTableStorage storage() {

Review Comment:
   ```suggestion
       public MvTableStorage storage() {
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -30,6 +31,21 @@
  * <p>Each MvPartitionStorage instance represents exactly one partition.
  */
 public interface MvPartitionStorage extends AutoCloseable {
+    /**
+     * Last known replicator index. {@code 0} if index is unknown.
+     */
+    long appliedIndex();
+
+    /**
+     * Sets the last known replicator index.
+     */
+    void appliedIndex(long appliedIndex) throws StorageException;
+
+    /**
+     * {@link #appliedIndex()} value consistent with the data, already persisted on the storage.
+     */
+    long persistedIndex();

Review Comment:
   What about the name **persistedAppliedIndex**?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -489,9 +491,17 @@ public boolean hasNext() {
                     public CommandClosure<WriteCommand> next() {
                         @Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done();
                         ByteBuffer data = iter.getData();
-                        WriteCommand command = JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+                        WriteCommand command = done == null ? JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+                        long index = iter.getIndex();
 
                         return new CommandClosure<>() {
+                            @Override
+                            public long index() {
+                                return index;
+                            }

Review Comment:
   ```suggestion
                           return new CommandClosure<>() {
                                /** {@inheritDoc} */
                               @Override
                               public long index() {
                                   return index;
                               }
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override

Review Comment:
   ```suggestion
        /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestMvTableStorage.java:
##########
@@ -65,6 +64,11 @@ public CompletableFuture<?> destroyPartition(int partitionId) throws StorageExce
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        assert appliedIndex > this.appliedIndex;
+
+        this.appliedIndex = appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        this.appliedIndex = appliedIndex;
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
         );
     }
 
+    @Override

Review Comment:
   ```suggestion
        /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -467,6 +491,21 @@ private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable
         return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
     }
 
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+
+        if (appliedIndexBytes == null) {
+            return 0;
+        }
+
+        return ByteUtils.bytesToLong(appliedIndexBytes);
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        try {
+            db.put(meta, appliedIndexKey, ByteUtils.longToBytes(appliedIndex));
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+

Review Comment:
   ```suggestion
               appliedIndexBytes = db.get(meta, appliedIndexKey);
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;
 
+    /** Meta column family. */
+    private final ColumnFamilyHandle meta;
+
     /** Write options. */
     private final WriteOptions writeOpts = new WriteOptions();
 
     /** Upper bound for scans and reads. */
     private final Slice upperBound;
 
+    /** Key to store applied index value in meta. */
+    private byte[] appliedIndexKey;
+
     /**
      * Constructor.
      *
      * @param partitionId Partition id.
      * @param db RocksDB instance.
      * @param cf Column family handle to store partition data.
+     * @param meta Column family handle to store partition metadata.
      */
-    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+    public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
         this.partitionId = partitionId;
         this.db = db;
         this.cf = cf;
+        this.meta = meta;
 
         heapKeyBuffer = withInitial(() ->
                 ByteBuffer.allocate(MAX_KEY_SIZE)
                         .order(BIG_ENDIAN)
         );
 
         upperBound = new Slice(partitionEndPrefix());
+
+        appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public long appliedIndex() {
+        byte[] appliedIndexBytes;
+
+        try {
+
+            appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
+
+        if (appliedIndexBytes == null) {
+            return 0;
+        }
+
+        return ByteUtils.bytesToLong(appliedIndexBytes);
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -528,6 +574,67 @@ private void incrementRowId(ByteBuffer buf) {
         };
     }
 
+    // TODO IGNITE-16769 Implement correct PartitionStorage rows count calculation.
+    @Override
+    public long rowsCount() {
+        try (
+                var upperBound = new Slice(partitionEndPrefix());
+                var options = new ReadOptions().setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, options)
+        ) {
+            it.seek(partitionStartPrefix());
+
+            long size = 0;
+
+            while (it.isValid()) {
+                ++size;
+                it.next();
+            }
+
+            return size;
+        }
+    }
+
+    @Override

Review Comment:
   ```suggestion
       /** {@inheritDoc} */
       @Override
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements MvPartitionStorage {
             ScanVersionChainByTimestamp::new
     );
 
+    /**
+     * Applied index value.
+     *
+     * @deprecated Not persistent, should be fixed later.

Review Comment:
   Later is when and where? add a ticket



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
         }
     }
 
+    @Override
+    public long appliedIndex() {
+        return appliedIndex;
+    }
+
+    @Override
+    public void appliedIndex(long appliedIndex) throws StorageException {
+        assert appliedIndex > this.appliedIndex;

Review Comment:
   ```suggestion
           assert appliedIndex > this.appliedIndex : "current=" + this.appliedIndex + ", new=" + appliedIndex;
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -118,4 +134,25 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws StorageException If failed to read data from the storage.
      */
     Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException;
+
+    /**
+     * Returns rows count belongs to current storage.
+     *
+     * @return Rows count.
+     * @throws StorageException If failed to obtain size.
+     * @deprecated It's not yet defined what a "count" is. This value is not easily defined for multiversioned storages.
+     *
+     */

Review Comment:
   ```suggestion
   
       /**
        * Returns rows count belongs to current storage.
        *
        * @return Rows count.
        * @throws StorageException If failed to obtain size.
        * @deprecated It's not yet defined what a "count" is. This value is not easily defined for multiversioned storages.
        */
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,8 +93,15 @@ public static MessagingService mockMessagingService(
         return messagingService;
     }
 
-    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj, AtomicLong raftIndex) {
+        long index = raftIndex.incrementAndGet();
+
         CommandClosure<T> closure = new CommandClosure<>() {
+            @Override

Review Comment:
   ```suggestion
           CommandClosure<T> closure = new CommandClosure<>() {
               /** {@inheritDoc} */
               @Override
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -109,6 +112,11 @@ public void result(@Nullable Serializable r) {
                                     }
                                 } else {
                                     CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                                        @Override
+                                        public long index() {

Review Comment:
   ```suggestion
                                           /** {@inheritDoc} */
                                           @Override
                                           public long index() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org