You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/05 08:51:33 UTC

[GitHub] [ignite-3] vldpyatkov opened a new pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

vldpyatkov opened a new pull request #118:
URL: https://github.com/apache/ignite-3/pull/118


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640357354



##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.insert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build()
+            );
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.upsert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 5))
+                .build()
+            );
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+        }
+
+        HashSet<Tuple> keys = new HashSet<>();
+
+        for (int i = 0; i < keysCnt; i++) {
+            keys.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+        }
+
+        Collection<Tuple> entries = view.getAll(keys);
+
+        assertEquals(keysCnt, entries.size());
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.replace(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .set("value", Long.valueOf(i + 5))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
                     .set("value", Long.valueOf(i + 2))
                     .build());
+
+            assertTrue(res);
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.delete(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertTrue(res);
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertNull(entry);
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+        ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
+
+        for (int i = 0; i < keysCnt; i++) {
+            batch.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build());
+        }
+
+        view.upsertAll(batch);
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        view.deleteAll(keys);
+
+        for (Tuple key : keys) {
+            Tuple entry = view.get(key);
+
+            assertNull(entry);
+        }
+    }
+
+    /**
+     * Checks operation over key-value binary table view.
+     *
+     * @param view Table biew.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640365754



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -289,4 +306,25 @@ protected TableRow wrap(BinaryRow row) {
 
         return new TableRow(schema, new Row(schema, row));
     }
+
+    /**
+     * @param rows Binary rows.
+     * @return Table rows.
+     */
+    private Collection<TableRow> wrap(Collection<BinaryRow> rows) {

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r639871684



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
+ * implemented into the network layer.

Review comment:
       where is todo if this solution is temporary?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -102,12 +158,36 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            setByPartition.computeIfAbsent(keyRow.hash() % partitions, HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[setByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : setByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new InsertAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        CompletableFuture<Collection<BinaryRow>> future = CompletableFuture.allOf(futures)

Review comment:
       local variable future is rudndant

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -163,4 +308,26 @@ else if (clo.command() instanceof UpsertCommand) {
             return hash;
         }
     }
+
+    /**
+     * @param row Row.
+     * @return Extracted key.
+     */
+    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {

Review comment:
       you don't need @NotNull for return values as far as the return value is primitive.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.success(null);
             }
-            else
-                assert false : "Command was not found [cmd=" + clo.command() + ']';
-        }
-    }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+                assert rows != null && !rows.isEmpty();
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+                clo.success(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
 
-        return new KeyWrapper(bytes, row.hash());
+                assert rows != null && !rows.isEmpty();
+
+                rows.stream()

Review comment:
       stream is redundant

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);

Review comment:
       ```
                   if ((current == null && !expected.hasValue()) ||
                       equalValues(current, expected)) {
                       storage.put(key, cmd.getRow());
                       clo.success(true);
                   }
   ```
   there is possible NPE because `current` might be null when you pass it to `equalValues`

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -57,10 +89,11 @@
             CommandClosure<WriteCommand> clo = iterator.next();
 
             if (clo.command() instanceof InsertCommand) {
-                BinaryRow previous = storage.putIfAbsent(
-                    extractAndWrapKey(((InsertCommand)clo.command()).getRow()),
-                    ((InsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((InsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Insert command should have a value.";
+
+                BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);

Review comment:
       I didn't get the idea of `storage` in `PartitionCommandListener`. When this storage will be purged? Do we store all partitions data in this storage? Seems that this approach may lead to OOM. If this solution is temporary, where is todo with future changes? 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -163,4 +308,26 @@ else if (clo.command() instanceof UpsertCommand) {
             return hash;
         }
     }
+
+    /**
+     * @param row Row.

Review comment:
       Please add describing javadoc, what contract for comparing is.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.success(null);
             }
-            else
-                assert false : "Command was not found [cmd=" + clo.command() + ']';
-        }
-    }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+                assert rows != null && !rows.isEmpty();
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+                clo.success(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
 
-        return new KeyWrapper(bytes, row.hash());
+                assert rows != null && !rows.isEmpty();
+
+                rows.stream()
+                    .forEach(k -> storage.put(extractAndWrapKey(k), k));
+
+                clo.success(null);
+            }
+            else if (clo.command() instanceof DeleteAllCommand) {
+                Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> {
+                        if (k.hasValue())
+                            return null;
+                        else {
+                            BinaryRow r = storage.remove(extractAndWrapKey(k));
+
+                            if (r == null)

Review comment:
       you can just return r instead of if check
   

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command puts a batch rows.
+ */
+public class UpsertAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after.

Review comment:
       need a ticket for that and in every place with todo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640454935



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);

Review comment:
       The equalValues is no longer throw NPE when one row is null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r641283000



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
##########
@@ -107,12 +109,22 @@ public SchemaRegistry schemaView() {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> getAll(Collection<Tuple> keyRecs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(getAllAsync(keyRecs));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(Collection<Tuple> keyRecs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keyRecs);

Review comment:
       Please add here and below @NotNull annotation and also reflect that in TableView methods and java docs, as we did for KeyValueView.java  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643025623



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643023520



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndCheck(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndCheck(true);
+
+        delete(true);
+    }
+
+    /**
+     * Upserts rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndCheck(false);
+
+        upsert();
+
+        readAndCheck(true);
+
+        delete(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * Adds rows, replaces and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndCheck(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * The test checks PutIfExist command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndCheck(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndCheck(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     * The test checks GetAndReplace command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndCheck(false);
+
+        getAndUpsertValues(false);
+
+        readAndCheck(true);
+
+        getAndReplaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndCheck(true);
+
+        deleteExactAllValues(true);
+
+        readAndCheck(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     * The test checks a batch upsert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * The test checks a batch insert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndCheck(boolean existed) {
+        readAndCheck(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640452132



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -57,10 +89,11 @@
             CommandClosure<WriteCommand> clo = iterator.next();
 
             if (clo.command() instanceof InsertCommand) {
-                BinaryRow previous = storage.putIfAbsent(
-                    extractAndWrapKey(((InsertCommand)clo.command()).getRow()),
-                    ((InsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((InsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Insert command should have a value.";
+
+                BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);

Review comment:
       Added TODO and link to the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640363943



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643025953



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640458444



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.success(null);
             }
-            else
-                assert false : "Command was not found [cmd=" + clo.command() + ']';
-        }
-    }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+                assert rows != null && !rows.isEmpty();
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+                clo.success(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
 
-        return new KeyWrapper(bytes, row.hash());
+                assert rows != null && !rows.isEmpty();
+
+                rows.stream()

Review comment:
       Done.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.success(null);
             }
-            else
-                assert false : "Command was not found [cmd=" + clo.command() + ']';
-        }
-    }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+                assert rows != null && !rows.isEmpty();
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+                clo.success(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
 
-        return new KeyWrapper(bytes, row.hash());
+                assert rows != null && !rows.isEmpty();
+
+                rows.stream()

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640364273



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r638750335



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());

Review comment:
       here and below it could be simpliefied to `.thenApply(SingleRowResponse::getValue);`

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *

Review comment:
       Please add at least short explanation of test scenario, empty javadoc looks ugly 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);

Review comment:
       Let's add @NotNull annotation for `keys` and also reflect that in javadoc. Let's add such annotation everywhere in `KVBinaryViewImpl` where it appropriate 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : keyRows) {
+            setByPartition.computeIfAbsent(keyRow.hash() % partitions, HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[setByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : setByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        CompletableFuture<Collection<BinaryRow>> future = CompletableFuture.allOf(futures)

Review comment:
       local variable is rudndant 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();

Review comment:
       let's rename it, seems that `keyRowsByPartition` is better, `setByPartition` is meaningless

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');

Review comment:
       test

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 

Review comment:
       Comment about `InternalTableImpl` constructor, lets remove `this.partitions = partitions;`, AFAIK `partition` always equals to `partitionMap.size()`

##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.insert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build()
+            );
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            view.upsert(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 5))
+                .build()
+            );
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+        }
+
+        HashSet<Tuple> keys = new HashSet<>();
+
+        for (int i = 0; i < keysCnt; i++) {
+            keys.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+        }
+
+        Collection<Tuple> entries = view.getAll(keys);
+
+        assertEquals(keysCnt, entries.size());
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.replace(
+                view.tupleBuilder()
                     .set("key", Long.valueOf(i))
+                    .set("value", Long.valueOf(i + 5))
                     .build(),
-                tbl.kvView().tupleBuilder()
+                view.tupleBuilder()
+                    .set("key", Long.valueOf(i))
                     .set("value", Long.valueOf(i + 2))
                     .build());
+
+            assertTrue(res);
+        }
+
+        for (int i = 0; i < keysCnt; i++) {
+            boolean res = view.delete(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertTrue(res);
+
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertNull(entry);
         }
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            Tuple entry = tbl.kvView().get(
-                tbl.kvView().tupleBuilder()
+        ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
+
+        for (int i = 0; i < keysCnt; i++) {
+            batch.add(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .set("value", Long.valueOf(i + 2))
+                .build());
+        }
+
+        view.upsertAll(batch);
+
+        for (int i = 0; i < keysCnt; i++) {
+            Tuple entry = view.get(view.tupleBuilder()
+                .set("key", Long.valueOf(i))
+                .build());
+
+            assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+        }
+
+        view.deleteAll(keys);
+
+        for (Tuple key : keys) {
+            Tuple entry = view.get(key);
+
+            assertNull(entry);
+        }
+    }
+
+    /**
+     * Checks operation over key-value binary table view.
+     *
+     * @param view Table biew.

Review comment:
       view

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));

Review comment:
       I would rename ts to t 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndChak(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndChak(true);
+
+        delete(true);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndChak(false);
+
+        upsert();
+
+        readAndChak(true);
+
+        delete(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndChak(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndChak(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndChak(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndChak(false);
+
+        getAndUpsertValues(false);
+
+        readAndChak(true);
+
+        getAndReplaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndChak(true);
+
+        deleteExactAllValues(true);
+
+        readAndChak(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndChak(boolean existed) {
+        readAndChak(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     * @param keyValueMapper Mapper a key to the value which will be expected.
+     */
+    private void readAndChak(boolean existed, Function<Integer, Integer> keyValueMapper) {

Review comment:
       checks 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);
+
+        return sync(getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Let's add @NotNull annotation for keys and also reflect that in javadoc
   
   

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -289,4 +306,25 @@ protected TableRow wrap(BinaryRow row) {
 
         return new TableRow(schema, new Row(schema, row));
     }
+
+    /**
+     * @param rows Binary rows.
+     * @return Table rows.
+     */
+    private Collection<TableRow> wrap(Collection<BinaryRow> rows) {

Review comment:
       Let's add javadoc here and for `KVBinaryViewImpl#wrap(org.apache.ignite.internal.schema.BinaryRow)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640374871



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -163,4 +308,26 @@ else if (clo.command() instanceof UpsertCommand) {
             return hash;
         }
     }
+
+    /**
+     * @param row Row.

Review comment:
       Rewrote javadoc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640351899



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndChak(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndChak(true);
+
+        delete(true);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndChak(false);
+
+        upsert();
+
+        readAndChak(true);
+
+        delete(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndChak(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndChak(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndChak(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndChak(false);
+
+        getAndUpsertValues(false);
+
+        readAndChak(true);
+
+        getAndReplaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndChak(true);
+
+        deleteExactAllValues(true);
+
+        readAndChak(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndChak(boolean existed) {
+        readAndChak(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     * @param keyValueMapper Mapper a key to the value which will be expected.
+     */
+    private void readAndChak(boolean existed, Function<Integer, Integer> keyValueMapper) {

Review comment:
       Done.

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *

Review comment:
       Added javadoc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640355110



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();

Review comment:
       Renamed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643025319



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.
+     * All rows remove before return.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640367231



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -102,12 +158,36 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : rows) {
+            setByPartition.computeIfAbsent(keyRow.hash() % partitions, HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[setByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : setByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new InsertAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        CompletableFuture<Collection<BinaryRow>> future = CompletableFuture.allOf(futures)

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643096904



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteExactAllCommand(Set<BinaryRow> rows) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640353661



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());

Review comment:
       Replaced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640364679



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
     }
 
     /** {@inheritDoc} */
     @Override public Tuple getAndRemove(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);
+
+        return sync(getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640363588



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] asfgit closed pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #118:
URL: https://github.com/apache/ignite-3/pull/118


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640369734



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -90,38 +123,150 @@ else if (clo.command() instanceof ReplaceCommand) {
                     clo.success(false);
             }
             else if (clo.command() instanceof UpsertCommand) {
-                storage.put(
-                    extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
-                    ((UpsertCommand)clo.command()).getRow()
-                );
+                BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+                assert row.hasValue() : "Upsert command should have a value.";
+
+                storage.put(extractAndWrapKey(row), row);
 
                 clo.success(null);
             }
-            else
-                assert false : "Command was not found [cmd=" + clo.command() + ']';
-        }
-    }
+            else if (clo.command() instanceof InsertAllCommand) {
+                Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+                assert rows != null && !rows.isEmpty();
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
-    }
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+                    .filter(Objects::nonNull)
+                    .filter(BinaryRow::hasValue)
+                    .collect(Collectors.toSet());
 
-    /**
-     * @param row Row.
-     * @return Extracted key.
-     */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+                clo.success(new MultiRowsResponse(res));
+            }
+            else if (clo.command() instanceof UpsertAllCommand) {
+                Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
 
-        return new KeyWrapper(bytes, row.hash());
+                assert rows != null && !rows.isEmpty();
+
+                rows.stream()
+                    .forEach(k -> storage.put(extractAndWrapKey(k), k));
+
+                clo.success(null);
+            }
+            else if (clo.command() instanceof DeleteAllCommand) {
+                Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
+
+                assert rows != null && !rows.isEmpty();
+
+                final Set<BinaryRow> res = rows.stream()
+                    .map(k -> {
+                        if (k.hasValue())
+                            return null;
+                        else {
+                            BinaryRow r = storage.remove(extractAndWrapKey(k));
+
+                            if (r == null)

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sk0x50 commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
sk0x50 commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r642939296



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * It is a response object to return a collection from the batch operation.
+ * @see GetAllCommand
+ * @see DeleteAllCommand
+ * @see InsertAllCommand
+ * @see DeleteExactAllCommand
+ */
+public class MultiRowsResponse implements Serializable {
+    /** Row. */

Review comment:
       Collection of binary rows.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * It is a response object to return a collection from the batch operation.

Review comment:
       This class represents a response object that contains a collection {@link BinaryRow} from a batch operation.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * It is a response object to return a collection from the batch operation.
+ * @see GetAllCommand
+ * @see DeleteAllCommand
+ * @see InsertAllCommand
+ * @see DeleteExactAllCommand
+ */
+public class MultiRowsResponse implements Serializable {
+    /** Row. */
+    private transient Collection<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public MultiRowsResponse(Collection<BinaryRow> rows) {
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * @return Data row.

Review comment:
       Collection of binary rows || Binary rows.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
+ * implemented into the network layer.
+ * TODO: Remove it after (IGNITE-14793)
+ */
+public class CommandUtils {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(CommandUtils.class);
+
+    /**
+     * Writes a list of rows to byte array.
+     *
+     * @param rows List of rows.

Review comment:
       Collection of rows.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
##########
@@ -17,63 +17,40 @@
 
 package org.apache.ignite.internal.table.distributed.command.response;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
-import org.apache.ignite.lang.IgniteLogger;
 
 /**
- * It is a response object for handling a table get command.
+ * It is a response object to return a row from the single operation.

Review comment:
       This class represents a response object message that contains a single binary row.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * It is a response object to return a collection from the batch operation.
+ * @see GetAllCommand
+ * @see DeleteAllCommand
+ * @see InsertAllCommand
+ * @see DeleteExactAllCommand
+ */
+public class MultiRowsResponse implements Serializable {
+    /** Row. */
+    private transient Collection<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.

Review comment:
       ```
   Creates a new instance of MultiRowsResponse with the given collection of binary rows.
   
   @param rows Collection of binary rows to be returned. (or Binary rows - it is up to you)
   ```

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
##########
@@ -17,63 +17,40 @@
 
 package org.apache.ignite.internal.table.distributed.command.response;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
-import org.apache.ignite.lang.IgniteLogger;
 
 /**
- * It is a response object for handling a table get command.
+ * It is a response object to return a row from the single operation.
+ * @see GetCommand
+ * @see GetAndDeleteCommand
+ * @see GetAndUpsertCommand
+ * @see GetAndReplaceCommand
  */
-public class KVGetResponse implements Serializable {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
+public class SingleRowResponse implements Serializable {
     /** Row. */
     private transient BinaryRow row;
 
     /*
      * Row bytes.
      * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after.
+     * TODO: Remove the field after (IGNITE-14793).
      */
     private byte[] rowBytes;
 
-    public KVGetResponse(BinaryRow row) {
-        this.row = row;
-
-        rowToBytes(row, bytes -> rowBytes = bytes);
-    }
-
     /**
-     * Writes a row to byte array.
-     *
      * @param row Row.
-     * @param consumer Byte array consumer.
      */
-    private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
-        if (row == null) {
-            consumer.accept(null);
-
-            return;
-        }
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            consumer.accept(baos.toByteArray());
-        }
-        catch (IOException e) {
-            LOG.error("Could not write row to stream [row=" + row + ']', e);
+    public SingleRowResponse(BinaryRow row) {

Review comment:
       ```
   Creates a new instance of SingleRowResponse with the given binary row.
   
   @param row Binary row to be returned. || Binary row.
   ```

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteAllCommand(Set<BinaryRow> rows) {

Review comment:
       @NotNull

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteExactAllCommand(Set<BinaryRow> rows) {
+        assert rows != null && !rows.isEmpty();
+
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Gets a list of keys which will used in the command.

Review comment:
       Returns a set of binary rows (keys) to be deleted.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.

Review comment:
       It would be nice to highlight the difference between this command and DeleteAllCommand (if I'm not mistaken, the last one uses only key in order to determine rows to be deleted)

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.

Review comment:
       Creates a new instance of DeleteAllCommand with the given set of rows to be deleted.
   {code rows} should not be {@code null} or empty.
   
   @param rows - Collection of binary rows to be deleted.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteAllCommand(Set<BinaryRow> rows) {
+        assert rows != null && !rows.isEmpty();
+
+        this.rows = rows;
+
+        CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+    }
+
+    /**
+     * Gets a list of keys which will used in the command.

Review comment:
       Returns a set of binary rows (keys) to be deleted.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes a entry by passed key.

Review comment:
       It would be nice to mention the difference between this command and DeleteCommand.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteAllCommand(Set<BinaryRow> rows) {

Review comment:
       ```
   Creates a new instance of DeleteAllCommand with the given set of rows to be deleted.
   {code rows} should not be {@code null} or empty.
   
   @param rows -Collection of binary rows to be deleted.
   ```

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.

Review comment:
       initializes

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteExactAllCommand(Set<BinaryRow> rows) {

Review comment:
       @NotNull

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.
+     * All rows remove before return.

Review comment:
       All rows are removed before returning. (here and below)

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command deletes a batch rows.
+ */
+public class DeleteAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after (IGNITE-14793).
+     */
+    private byte[] rowsBytes;
+
+    /**
+     * @param rows Rows.
+     */
+    public DeleteAllCommand(Set<BinaryRow> rows) {

Review comment:
       Please update all commands in the same way (where it is appropriate).

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.

Review comment:
       Inserts

##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndCheck(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndCheck(true);
+
+        delete(true);
+    }
+
+    /**
+     * Upserts rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndCheck(false);
+
+        upsert();
+
+        readAndCheck(true);
+
+        delete(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * Adds rows, replaces and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndCheck(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * The test checks PutIfExist command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndCheck(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndCheck(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     * The test checks GetAndReplace command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndCheck(false);
+
+        getAndUpsertValues(false);
+
+        readAndCheck(true);
+
+        getAndReplaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndCheck(true);
+
+        deleteExactAllValues(true);
+
+        readAndCheck(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     * The test checks a batch upsert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * The test checks a batch insert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndCheck(boolean existed) {
+        readAndCheck(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.

Review comment:
       Reads




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640356041



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
-        return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
-            .thenApply(KVGetResponse::getValue);
+        return partitionMap.get(keyRow.hash() % partitions).<SingleRowResponse>run(new GetCommand(keyRow))
+            .thenApply(response -> response.getValue());
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
-        return null;
+        HashMap<Integer, HashSet<BinaryRow>> setByPartition = new HashMap<>();
+
+        for (BinaryRow keyRow : keyRows) {
+            setByPartition.computeIfAbsent(keyRow.hash() % partitions, HashSet::new)
+                .add(keyRow);
+        }
+
+        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[setByPartition.size()];
+
+        int batchNum = 0;
+
+        for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : setByPartition.entrySet()) {
+            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+
+            batchNum++;
+        }
+
+        CompletableFuture<Collection<BinaryRow>> future = CompletableFuture.allOf(futures)

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640356990



##########
File path: modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
##########
@@ -242,71 +271,232 @@ public void partitionedTable() {
             }
         });
 
-        for (int i = 0; i < PARTS * 10; i++) {
-            tbl.kvView().putIfAbsent(
-                tbl.kvView().tupleBuilder()
+        partitionedTableView(tbl, PARTS * 10);
+
+        partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+    }
+
+    /**
+     * Checks operation over row table view.
+     *
+     * @param view Table view.
+     * @param keysCnt Count of keys.
+     */
+    public void partitionedTableView(Table view, int keysCnt) {
+        LOG.info("Tes for Table view [keys=" + keysCnt + ']');

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640371217



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
##########
@@ -163,4 +308,26 @@ else if (clo.command() instanceof UpsertCommand) {
             return hash;
         }
     }
+
+    /**
+     * @param row Row.
+     * @return Extracted key.
+     */
+    @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640474247



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
+ * implemented into the network layer.

Review comment:
       There is the same ticket as before. I don't know it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r638742491



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionCommandListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionCommandListener();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndChak(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndChak(true);
+
+        delete(true);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndChak(false);
+
+        upsert();
+
+        readAndChak(true);
+
+        delete(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndChak(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndChak(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndChak(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndChak(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndChak(false);
+
+        getAndUpsertValues(false);
+
+        readAndChak(true);
+
+        getAndReplaceValues(true);
+
+        readAndChak(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndChak(true);
+
+        deleteExactAllValues(true);
+
+        readAndChak(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                doAnswer(invocation -> {
+                    fail("Exception happened: " + invocation.getArgument(0));
+
+                    return null;
+                }).when(clo).failure(any());
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).success(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).success(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndChak(boolean existed) {
+        readAndChak(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     * @param keyValueMapper Mapper a key to the value which will be expected.
+     */
+    private void readAndChak(boolean existed, Function<Integer, Integer> keyValueMapper) {

Review comment:
       check




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640464291



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -71,13 +85,37 @@ public InternalTableImpl(
 

Review comment:
       I do not think that all partitions is presented in the map.
   I think it will be able to change in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640366117



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
##########
@@ -172,22 +175,34 @@ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
 
     /** {@inheritDoc} */
     @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(keys);
+
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
-    @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(Collection<Tuple> keys) {
+        Objects.requireNonNull(keys);
+
+        return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+            .thenApply(this::wrap)
+            .thenApply(ts -> ts.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r640473690



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * The command puts a batch rows.
+ */
+public class UpsertAllCommand implements WriteCommand {
+    /** Rows. */
+    private transient Set<BinaryRow> rows;
+
+    /*
+     * Row bytes.
+     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+     * TODO: Remove the field after.

Review comment:
       I do not know the ticket where serialization would be prepared to another message format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org