You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/10/06 09:53:28 UTC
[ignite-3] 01/01: Change after review from Kirill T.
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch ignite-20435
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit cc29ed6cbb165a5581d16fe5a9333835000282a5
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Oct 6 12:52:47 2023 +0300
Change after review from Kirill T.
---
.../ignite/client/fakes/FakeInternalTable.java | 27 --------
.../ignite/internal/table/ItInternalTableTest.java | 72 ++++++++++++++--------
.../ignite/internal/table/InternalTable.java | 18 ++++--
.../distributed/storage/InternalTableImpl.java | 4 +-
4 files changed, 60 insertions(+), 61 deletions(-)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 8dc644ba90..02e382a279 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -79,13 +79,11 @@ public class FakeInternalTable implements InternalTable {
this.keyExtractor = keyExtractor;
}
- /** {@inheritDoc} */
@Override
public MvTableStorage storage() {
throw new UnsupportedOperationException("Not implemented yet");
}
- /** {@inheritDoc} */
@Override
public int partitions() {
return 1;
@@ -96,19 +94,16 @@ public class FakeInternalTable implements InternalTable {
return tableId;
}
- /** {@inheritDoc} */
@Override
public String name() {
return tableName;
}
- /** {@inheritDoc} */
@Override
public int partitionId(BinaryRowEx row) {
return 0;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
return completedFuture(getImpl(keyRow.tupleSlice(), keyRow));
@@ -128,7 +123,6 @@ public class FakeInternalTable implements InternalTable {
return data.get(key);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, @Nullable InternalTransaction tx) {
var res = new ArrayList<BinaryRow>();
@@ -156,7 +150,6 @@ public class FakeInternalTable implements InternalTable {
return null;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) {
upsertImpl(keyExtractor.extractColumns(row), row);
@@ -170,7 +163,6 @@ public class FakeInternalTable implements InternalTable {
data.put(key.byteBuffer(), row);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
for (var row : rows) {
@@ -181,13 +173,11 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(null);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) {
throw new UnsupportedOperationException();
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
@Nullable InternalTransaction tx) {
@@ -202,7 +192,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(res);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
@@ -218,7 +207,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(old == null);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
@@ -233,7 +221,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(skipped);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
@@ -241,7 +228,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(replaceImpl(key, row, tx) != null);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(oldRow);
@@ -275,7 +261,6 @@ public class FakeInternalTable implements InternalTable {
return old;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
@@ -287,7 +272,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(replace);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
BinaryRow old = getImpl(keyRow.tupleSlice(), keyRow);
@@ -300,7 +284,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(old != null);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) {
var res = false;
@@ -318,7 +301,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(res);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryRow old = getImpl(row.tupleSlice(), row);
@@ -331,7 +313,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(old);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
@@ -346,7 +327,6 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(skipped);
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
@@ -434,42 +414,35 @@ public class FakeInternalTable implements InternalTable {
throw new IgniteInternalException(new OperationNotSupportedException());
}
- /** {@inheritDoc} */
@Override
public List<String> assignments() {
throw new IgniteInternalException(new OperationNotSupportedException());
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<List<PrimaryReplica>> primaryReplicas() {
return CompletableFuture.failedFuture(new IgniteInternalException(new OperationNotSupportedException()));
}
- /** {@inheritDoc} */
@Override
public ClusterNode leaderAssignment(int partition) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
- /** {@inheritDoc} */
@Override
public RaftGroupService partitionRaftGroupService(int partition) {
return null;
}
- /** {@inheritDoc} */
@Override public TxStateTableStorage txStateStorage() {
return null;
}
- /** {@inheritDoc} */
@Override
public int partition(BinaryRowEx keyRow) {
return 0;
}
- /** {@inheritDoc} */
@Override
public void close() {
// No-op.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index d104acf32f..dd98217ad1 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -32,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -39,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
@@ -360,29 +362,33 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
}
@Test
- public void getAllOrderTest() throws Exception {
- var keyRows = populateEvenKeysAndPrepareEntriesToLookup(true);
+ public void getAllOrderTest() {
+ List<BinaryRowEx> keyRows = populateEvenKeysAndPrepareEntriesToLookup(true);
InternalTable internalTable = ((TableImpl) table).internalTable();
- var schemaDescriptor = ((TableImpl) table).schemaView().schema();
+ SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema();
- List<BinaryRow> res = internalTable.getAll(keyRows, null).get();
+ CompletableFuture<List<BinaryRow>> getAllFut = internalTable.getAll(keyRows, null);
+
+ assertThat(getAllFut, willCompleteSuccessfully());
+
+ List<BinaryRow> res = getAllFut.join();
assertEquals(keyRows.size(), res.size());
- var resIter = res.iterator();
+ Iterator<BinaryRow> resIter = res.iterator();
for (BinaryRowEx key : keyRows) {
int i = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue();
- var resRow = resIter.next();
+ BinaryRow resRow = resIter.next();
if (i % 2 == 1) {
assertNull(resRow);
} else {
assertNotNull(resRow);
- var rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resRow));
+ Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resRow));
assertEquals(i % 100L, rowTuple.<Long>value("key"));
assertEquals(i, rowTuple.<Integer>value("valInt"));
@@ -392,21 +398,25 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
}
@Test
- public void deleteAllOrderTest() throws Exception {
- var keyRows = populateEvenKeysAndPrepareEntriesToLookup(true);
+ public void deleteAllOrderTest() {
+ List<BinaryRowEx> keyRows = populateEvenKeysAndPrepareEntriesToLookup(true);
InternalTable internalTable = ((TableImpl) table).internalTable();
- var schemaDescriptor = ((TableImpl) table).schemaView().schema();
+ SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema();
+
+ CompletableFuture<List<BinaryRow>> deleteAllFut = internalTable.deleteAll(keyRows, null);
+
+ assertThat(deleteAllFut, willCompleteSuccessfully());
- List<BinaryRow> res = internalTable.deleteAll(keyRows, null).get();
+ List<BinaryRow> res = deleteAllFut.join();
- var resIter = res.iterator();
+ Iterator<BinaryRow> resIter = res.iterator();
for (BinaryRowEx key : keyRows) {
int i = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue();
if (i % 2 == 1) {
- var rowTuple = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, resIter.next()));
+ Tuple rowTuple = TableRow.keyTuple(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, resIter.next()));
assertEquals(i % 100L, rowTuple.<Long>value("key"));
}
@@ -414,21 +424,25 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
}
@Test
- public void deleteAllExactOrderTest() throws Exception {
- var rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false);
+ public void deleteAllExactOrderTest() {
+ List<BinaryRowEx> rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false);
InternalTable internalTable = ((TableImpl) table).internalTable();
- var schemaDescriptor = ((TableImpl) table).schemaView().schema();
+ SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema();
- List<BinaryRow> res = internalTable.deleteAllExact(rowsToLookup, null).get();
+ CompletableFuture<List<BinaryRow>> deleteAllExactFut = internalTable.deleteAllExact(rowsToLookup, null);
- var resIter = res.iterator();
+ assertThat(deleteAllExactFut, willCompleteSuccessfully());
+
+ List<BinaryRow> res = deleteAllExactFut.join();
+
+ Iterator<BinaryRow> resIter = res.iterator();
for (BinaryRowEx key : rowsToLookup) {
int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue();
if (i % 2 == 1) {
- var rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
+ Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
assertEquals(i % 100L, rowTuple.<Long>value("key"));
assertEquals(i, rowTuple.<Integer>value("valInt"));
@@ -438,21 +452,25 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
}
@Test
- public void insertAllOrderTest() throws Exception {
- var rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false);
+ public void insertAllOrderTest() {
+ List<BinaryRowEx> rowsToLookup = populateEvenKeysAndPrepareEntriesToLookup(false);
InternalTable internalTable = ((TableImpl) table).internalTable();
- var schemaDescriptor = ((TableImpl) table).schemaView().schema();
+ SchemaDescriptor schemaDescriptor = ((TableImpl) table).schemaView().schema();
+
+ CompletableFuture<List<BinaryRow>> insertAllFut = internalTable.insertAll(rowsToLookup, null);
+
+ assertThat(insertAllFut, willCompleteSuccessfully());
- List<BinaryRow> res = internalTable.insertAll(rowsToLookup, null).get();
+ List<BinaryRow> res = insertAllFut.join();
- var resIter = res.iterator();
+ Iterator<BinaryRow> resIter = res.iterator();
for (BinaryRowEx key : rowsToLookup) {
int i = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, key)).<Long>value("key").intValue();
if (i % 2 == 0) {
- var rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
+ Tuple rowTuple = TableRow.tuple(Row.wrapBinaryRow(schemaDescriptor, resIter.next()));
assertEquals(i % 100L, rowTuple.<Long>value("key"));
assertEquals(i, rowTuple.<Integer>value("valInt"));
@@ -484,7 +502,7 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
- var retrievedItems = scanAllPartitions(node);
+ List<BinaryRow> retrievedItems = scanAllPartitions(node);
assertEquals(0, retrievedItems.size());
@@ -538,7 +556,7 @@ public class ItInternalTableTest extends BaseIgniteAbstractTest {
});
}
- subscriberAllDataAwaitLatch.await();
+ subscriberAllDataAwaitLatch.await(10, TimeUnit.SECONDS);
return retrievedItems;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 528dfaf604..1b588578da 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -178,8 +178,10 @@ public interface InternalTable extends ManuallyCloseable {
* Asynchronously insert rows into the table which do not exist, skipping existed ones.
*
* @param rows Rows to insert into the table.
- * @param tx The transaction.
- * @return Future representing pending completion of the operation.
+ * @param tx The transaction.
+ * @return Future represents the pending completion of the operation, with rejected rows for insertion in the result. The order of
+ * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is inserted, the element will be
+ * excluded from the collection result.
*/
CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx);
@@ -243,8 +245,10 @@ public interface InternalTable extends ManuallyCloseable {
* Asynchronously remove rows with the same key columns values as the given one has from the table.
*
* @param rows Rows with key columns set.
- * @param tx The transaction.
- * @return Future representing pending completion of the operation.
+ * @param tx The transaction.
+ * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of
+ * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is deleted, the element will be
+ * excluded from the collection result.
*/
CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx);
@@ -252,8 +256,10 @@ public interface InternalTable extends ManuallyCloseable {
* Asynchronously remove given rows from the table.
*
* @param rows Rows to delete.
- * @param tx The transaction.
- * @return Future representing pending completion of the operation.
+ * @param tx The transaction.
+ * @return Future represents the pending completion of the operation, with rejected rows for deletion in the result. The order of
+ * collection elements is guaranteed to be the same as the order of {@code rows}. If a record is deleted, the element will be
+ * excluded from the collection result.
*/
CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9f42f49760..9f3526d35a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1480,7 +1480,9 @@ public class InternalTableImpl implements InternalTable {
List<BinaryRow> result = new ArrayList<>();
List<Boolean> response = (List<Boolean>) batch.getCompletedResult();
- assert batch.requestedRows.size() == response.size();
+ assert batch.requestedRows.size() == response.size() :
+ "Replication response does not fit to request [requestRows=" + batch.requestedRows.size()
+ + "responseRows=" + response.size() + ']';
for (int i = 0; i < response.size(); i++) {
result.add(response.get(i) ? null : batch.requestedRows.get(i));