You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/01 11:40:11 UTC

[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #118: IGNITE-14239 Raft based implementation of atomic protocol

vldpyatkov commented on a change in pull request #118:
URL: https://github.com/apache/ignite-3/pull/118#discussion_r643023520



##########
File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
##########
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ */
+public class PartitionCommandListenerTest {
+    /** Key count. */
+    public static final int KEY_COUNT = 100;
+
+    /** Schema. */
+    public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+        new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+    );
+
+    /** Table command listener. */
+    private static PartitionListener commandListener;
+
+    /**
+     * Inisializes a table listener before tests.
+     */
+    @BeforeAll
+    public static void before() {
+        commandListener = new PartitionListener();
+    }
+
+    /**
+     * Insrets rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertCommands() {
+        readAndCheck(false);
+
+        delete(false);
+
+        insert(false);
+
+        insert(true);
+
+        readAndCheck(true);
+
+        delete(true);
+    }
+
+    /**
+     * Upserts rows and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertValues() {
+        readAndCheck(false);
+
+        upsert();
+
+        readAndCheck(true);
+
+        delete(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * Adds rows, replaces and checks them.
+     * All rows remove before return.
+     */
+    @Test
+    public void testReplaceCommand() {
+        upsert();
+
+        deleteExactValues(false);
+
+        replaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        replaceValues(false);
+
+        readAndCheck(true, i -> i + 1);
+
+        deleteExactValues(true);
+
+        readAndCheck(false);
+    }
+
+    /**
+     * The test checks PutIfExist command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testPutIfExistCommand() {
+        putIfExistValues(false);
+
+        readAndCheck(false);
+
+        upsert();
+
+        putIfExistValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndDeleteValues(true);
+
+        readAndCheck(false);
+
+        getAndDeleteValues(false);
+    }
+
+    /**
+     * The test checks GetAndReplace command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testGetAndReplaceCommand() {
+        readAndCheck(false);
+
+        getAndUpsertValues(false);
+
+        readAndCheck(true);
+
+        getAndReplaceValues(true);
+
+        readAndCheck(true, i -> i + 1);
+
+        getAndUpsertValues(true);
+
+        readAndCheck(true);
+
+        deleteExactAllValues(true);
+
+        readAndCheck(false);
+
+        getAndReplaceValues(false);
+
+        deleteExactAllValues(false);
+    }
+
+    /**
+     * The test checks a batch upsert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testUpsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        upsertAll();
+
+        readAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * The test checks a batch insert command.
+     * All rows remove before return.
+     */
+    @Test
+    public void testInsertRowsBatchedAndCheck() {
+        readAll(false);
+
+        deleteAll(false);
+
+        insertAll(false);
+
+        readAll(true);
+
+        insertAll(true);
+
+        deleteAll(true);
+
+        readAll(false);
+    }
+
+    /**
+     * Prepares a closure iterator for a specific batch operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            boolean moved;
+
+            @Override public boolean hasNext() {
+                return !moved;
+            }
+
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(clo);
+
+                moved = true;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * Prepares a closure iterator for a specific operation.
+     *
+     * @param func The function prepare a closure for the operation.
+     * @param <T> Type of the operation.
+     * @return Closure iterator.
+     */
+    private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<CommandClosure<T>>() {
+            /** Iteration. */
+            private int i = 0;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return i < KEY_COUNT;
+            }
+
+            /** {@inheritDoc} */
+            @Override public CommandClosure<T> next() {
+                CommandClosure<T> clo = mock(CommandClosure.class);
+
+                func.accept(i, clo);
+
+                i++;
+
+                return clo;
+            }
+        };
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void insertAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new InsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * Upserts values from the listener in the batch operation.
+     */
+    private void upsertAll() {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+
+            Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                rows.add(getTestRow(i, i));
+
+            when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void deleteAll(boolean existed) {
+        commandListener.onWrite(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAll(boolean existed) {
+        commandListener.onRead(batchIterator(clo -> {
+            doAnswer(invocation -> {
+                MultiRowsResponse resp = invocation.getArgument(0);
+
+                if (existed) {
+                    assertEquals(KEY_COUNT, resp.getValues().size());
+
+                    for (BinaryRow binaryRow : resp.getValues()) {
+                        Row row = new Row(SCHEMA, binaryRow);
+
+                        int keyVal = row.intValue(0);
+
+                        assertTrue(keyVal < KEY_COUNT);
+                        assertEquals(keyVal, row.intValue(1));
+                    }
+                }
+                else
+                    assertTrue(resp.getValues().isEmpty());
+
+                return null;
+            }).when(clo).result(any(MultiRowsResponse.class));
+
+            Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+            for (int i = 0; i < KEY_COUNT; i++)
+                keyRows.add(getTestKey(i));
+
+            when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+        }));
+    }
+
+    /**
+     * Upserts rows.
+     */
+    private void upsert() {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+            doAnswer(invocation -> {
+                assertNull(invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void delete(boolean existed) {
+        commandListener.onWrite(iterator((i, clo) -> {
+            when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+            doAnswer(invocation -> {
+                assertEquals(existed, invocation.getArgument(0));
+
+                return null;
+            }).when(clo).result(any());
+        }));
+    }
+
+    /**
+     * Reads rows from the listener and checks them.
+     *
+     * @param existed True if rows are existed, false otherwise.
+     */
+    private void readAndCheck(boolean existed) {
+        readAndCheck(existed, i -> i);
+    }
+
+    /**
+     * Reades rows from the listener and checks values as expected by a mapper.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org