You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/25 15:58:28 UTC

[GitHub] [ignite-3] alievmirza commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

alievmirza commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r638750335



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());

Review comment:
       here and below it could be simpliefied to `.thenApply(SingleRowResponse::getValue);`

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *

Review comment:
       Please add at least short explanation of test scenario, empty javadoc looks ugly 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);

Review comment:
       Let's add @NotNull annotation for `keys` and also reflect that in javadoc. Let's add such annotation everywhere in `KVBinaryViewImpl` where it appropriate 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : keyRows) {
+            setByPartition.computeIfAbsent(keyRow.hash() % partitions, HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[setByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : setByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        CompletableFuture<Collection<BinaryRow>> future = CompletableFuture.allOf(futures)

Review comment:
       local variable is rudndant 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();

Review comment:
       let's rename it, seems that `keyRowsByPartition` is better, `setByPartition` is meaningless

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');

Review comment:
       test

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 

Review comment:
       Comment about `InternalTableImpl` constructor, lets remove `this.partitions = partitions;`, AFAIK `partition` always equals to `partitionMap.size()`

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.insert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build()
+            );
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.upsert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 5))
+                .build()
+            );
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+        }
+
+        HashSet<Tuple> keys = new HashSet<>();
+
+        for (int i = 0; i < keysCnt; i++) {
+            keys.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+        }
+
+        Collection<Tuple> entries = view.getAll(keys);
+
+        assertEquals(keysCnt, entries.size());
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.replace(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .set("value", Long.valueOf(i + 5))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
                     .set("value", Long.valueOf(i + 2))
                     .build());
+
+            assertTrue(res);
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.delete(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertTrue(res);
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertNull(entry);
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+        ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
+
+        for (int i = 0; i < keysCnt; i++) {
+            batch.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build());
+        }
+
+        view.upsertAll(batch);
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        view.deleteAll(keys);
+
+        for (Tuple key : keys) {
+            Tuple entry = view.get(key);
+
+            assertNull(entry);
+        }
+    }
+
+    /**
+     * Checks operation over key-value binary table view.
+     *
+     * @param view Table biew.

Review comment:
       view

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));

Review comment:
       I would rename ts to t 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndChak(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndChak(true);
+
+        delete(true);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndChak(false);
+
+        upsert();
+
+        readAndChak(true);
+
+        delete(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndChak(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndChak(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndChak(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndChak(false);
+
+        getAndUpsertValues(false);
+
+        readAndChak(true);
+
+        getAndReplaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndChak(true);
+
+        deleteExactAllValues(true);
+
+        readAndChak(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndChak(boolean existed) {
+        readAndChak(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     * @param keyValueMapper Mapper a key to the value which will be expected.
+     */
+    private void readAndChak(boolean existed, Function<Integer, Integer> keyValueMapper) {

Review comment:
       checks 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);
+
+        return sync(getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -289,4 +306,25 @@ protected TableRow wrap(BinaryRow row) {
 
         return new TableRow(schema, new Row(schema, row));
     }
+
+    /**
+     * @param rows Binary rows.
+     * @return Table rows.
+     */
+    private Collection<TableRow> wrap(Collection<BinaryRow> rows) {

Review comment:
       Let's add javadoc here and for `KVBinaryViewImpl#wrap(org.apache.ignite.internal.schema.BinaryRow)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org