You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/07/14 06:31:14 UTC
[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #937: IGNITE-16907 Init RAFT group with applied index that corresponds to persisted storage data.
tkalkirill commented on code in PR #937:
URL: https://github.com/apache/ignite-3/pull/937#discussion_r920785270
##########
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java:
##########
@@ -28,6 +28,14 @@
* @see RaftGroupListener
*/
public interface CommandClosure<R extends Command> {
+ /**
+ * Corresponding log index of the command. Present for write commands only.
+ * Returns {@code 0} for read commands.
+ */
+ default long index() {
Review Comment:
What about the name **appliedIndex**?
##########
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java:
##########
@@ -63,7 +63,7 @@ public FakeInternalTable(String tableName, UUID tableId) {
/** {@inheritDoc} */
@Override
- public @NotNull TableStorage storage() {
+ public @NotNull MvTableStorage storage() {
Review Comment:
```suggestion
public MvTableStorage storage() {
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -30,6 +31,21 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Last known replicator index. {@code 0} if index is unknown.
+ */
+ long appliedIndex();
+
+ /**
+ * Sets the last known replicator index.
+ */
+ void appliedIndex(long appliedIndex) throws StorageException;
+
+ /**
+ * {@link #appliedIndex()} value consistent with the data, already persisted on the storage.
+ */
+ long persistedIndex();
Review Comment:
What about the name **persistedAppliedIndex**?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -489,9 +491,17 @@ public boolean hasNext() {
public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
- WriteCommand command = JDKMarshaller.DEFAULT.unmarshall(data.array());
+
+ WriteCommand command = done == null ? JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+
+ long index = iter.getIndex();
return new CommandClosure<>() {
+ @Override
+ public long index() {
+ return index;
+ }
Review Comment:
```suggestion
return new CommandClosure<>() {
/** {@inheritDoc} */
@Override
public long index() {
return index;
}
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestMvTableStorage.java:
##########
@@ -65,6 +64,11 @@ public CompletableFuture<?> destroyPartition(int partitionId) throws StorageExce
return CompletableFuture.completedFuture(null);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ assert appliedIndex > this.appliedIndex;
+
+ this.appliedIndex = appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ this.appliedIndex = appliedIndex;
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -123,6 +132,21 @@ private VersionChainTree createVersionChainTree(
);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -467,6 +491,21 @@ private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable
return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
}
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+
+ if (appliedIndexBytes == null) {
+ return 0;
+ }
+
+ return ByteUtils.bytesToLong(appliedIndexBytes);
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ try {
+ db.put(meta, appliedIndexKey, ByteUtils.longToBytes(appliedIndex));
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
Review Comment:
```suggestion
appliedIndexBytes = db.get(meta, appliedIndexKey);
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -95,30 +98,73 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Partitions column family. */
private final ColumnFamilyHandle cf;
+ /** Meta column family. */
+ private final ColumnFamilyHandle meta;
+
/** Write options. */
private final WriteOptions writeOpts = new WriteOptions();
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ /** Key to store applied index value in meta. */
+ private byte[] appliedIndexKey;
+
/**
* Constructor.
*
* @param partitionId Partition id.
* @param db RocksDB instance.
* @param cf Column family handle to store partition data.
+ * @param meta Column family handle to store partition metadata.
*/
- public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf) {
+ public RocksDbMvPartitionStorage(int partitionId, RocksDB db, ColumnFamilyHandle cf, ColumnFamilyHandle meta) {
this.partitionId = partitionId;
this.db = db;
this.cf = cf;
+ this.meta = meta;
heapKeyBuffer = withInitial(() ->
ByteBuffer.allocate(MAX_KEY_SIZE)
.order(BIG_ENDIAN)
);
upperBound = new Slice(partitionEndPrefix());
+
+ appliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long appliedIndex() {
+ byte[] appliedIndexBytes;
+
+ try {
+
+ appliedIndexBytes = db.get(meta, appliedIndexKey);
+
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+
+ if (appliedIndexBytes == null) {
+ return 0;
+ }
+
+ return ByteUtils.bytesToLong(appliedIndexBytes);
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -528,6 +574,67 @@ private void incrementRowId(ByteBuffer buf) {
};
}
+ // TODO IGNITE-16769 Implement correct PartitionStorage rows count calculation.
+ @Override
+ public long rowsCount() {
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ var options = new ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, options)
+ ) {
+ it.seek(partitionStartPrefix());
+
+ long size = 0;
+
+ while (it.isValid()) {
+ ++size;
+ it.next();
+ }
+
+ return size;
+ }
+ }
+
+ @Override
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -71,6 +72,14 @@ public class PageMemoryMvPartitionStorage implements MvPartitionStorage {
ScanVersionChainByTimestamp::new
);
+ /**
+ * Applied index value.
+ *
+ * @deprecated Not persistent, should be fixed later.
Review Comment:
Later is when and where? add a ticket
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -74,6 +78,23 @@ public boolean notContainsWriteIntent() {
}
}
+ @Override
+ public long appliedIndex() {
+ return appliedIndex;
+ }
+
+ @Override
+ public void appliedIndex(long appliedIndex) throws StorageException {
+ assert appliedIndex > this.appliedIndex;
Review Comment:
```suggestion
assert appliedIndex > this.appliedIndex : "current=" + this.appliedIndex + ", new=" + appliedIndex;
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -118,4 +134,25 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws StorageException If failed to read data from the storage.
*/
Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException;
+
+ /**
+ * Returns rows count belongs to current storage.
+ *
+ * @return Rows count.
+ * @throws StorageException If failed to obtain size.
+ * @deprecated It's not yet defined what a "count" is. This value is not easily defined for multiversioned storages.
+ *
+ */
Review Comment:
```suggestion
/**
* Returns rows count belongs to current storage.
*
* @return Rows count.
* @throws StorageException If failed to obtain size.
* @deprecated It's not yet defined what a "count" is. This value is not easily defined for multiversioned storages.
*/
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -88,8 +93,15 @@ public static MessagingService mockMessagingService(
return messagingService;
}
- private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
+ private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj, AtomicLong raftIndex) {
+ long index = raftIndex.incrementAndGet();
+
CommandClosure<T> closure = new CommandClosure<>() {
+ @Override
Review Comment:
```suggestion
CommandClosure<T> closure = new CommandClosure<>() {
/** {@inheritDoc} */
@Override
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java:
##########
@@ -109,6 +112,11 @@ public void result(@Nullable Serializable r) {
}
} else {
CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ @Override
+ public long index() {
Review Comment:
```suggestion
/** {@inheritDoc} */
@Override
public long index() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org