You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/10/27 09:42:08 UTC

[ignite-3] 03/04: Implement get/put operations.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-15783
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 2b45cb9f1f9723bb5ce49967f54ada32cc150f0a
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Tue Oct 26 16:00:32 2021 +0300

    Implement get/put operations.
---
 .../org/apache/ignite/table/mapper/Mapper.java     |   7 +
 .../marshaller/reflection/JavaSerializer.java      |   7 +-
 .../internal/schema/marshaller/KVSerializer.java   |   8 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  63 +++-
 .../internal/table/KeyValueOperationsTest.java     | 385 +++++++++++++++++++++
 5 files changed, 459 insertions(+), 11 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
index 7210378..239df2a 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
@@ -27,6 +27,13 @@ import org.apache.ignite.table.Tuple;
  */
 public interface Mapper<R> {
     /**
+     * Return mapped type.
+     *
+     * @return Mapped type.
+     */
+    Class<R> getType();
+
+    /**
      * Mapper builder.
      *
      * @param <T> Type.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index c7da543..0c18aaa 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -69,7 +69,9 @@ public class JavaSerializer extends AbstractSerializer {
         assert val == null || valClass.isInstance(val);
 
         keyMarsh.writeObject(key, asm);
-        valMarsh.writeObject(val, asm);
+
+        if (val != null)
+            valMarsh.writeObject(val, asm);
 
         return asm.toBytes();
     }
@@ -127,6 +129,9 @@ public class JavaSerializer extends AbstractSerializer {
 
     /** {@inheritDoc} */
     @Override protected Object deserializeValue0(Row row) throws SerializationException {
+        if (!row.hasValue())
+            return null;
+
         final Object o = valMarsh.readObject(row);
 
         assert o == null || valClass.isInstance(o);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
index f13e20a..483f93c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
-import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -30,17 +30,17 @@ public interface KVSerializer<K, V> {
      * @param val Value object to serialize.
      * @return Table row with columns serialized from given key-value pair.
      */
-    Row serialize(@NotNull K key, V val);
+    BinaryRow serialize(@NotNull K key, V val);
 
     /**
      * @param row Table row.
      * @return Deserialized key object.
      */
-    @NotNull K deserializeKey(@NotNull Row row);
+    @NotNull K deserializeKey(@NotNull BinaryRow row);
 
     /**
      * @param row Table row.
      * @return Deserialized value object.
      */
-    @Nullable V deserializeValue(@NotNull Row row);
+    @Nullable V deserializeValue(@NotNull BinaryRow row);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 44ac174..4193198 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -18,15 +18,22 @@
 package org.apache.ignite.internal.table;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.KVSerializer;
+import org.apache.ignite.internal.schema.marshaller.SerializationException;
+import org.apache.ignite.internal.schema.marshaller.Serializer;
+import org.apache.ignite.internal.schema.marshaller.SerializerFactory;
 import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.mapper.Mapper;
@@ -38,8 +45,15 @@ import org.jetbrains.annotations.Nullable;
  * Key-value view implementation.
  */
 public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValueView<K, V> {
+    /** Key object mapper. */
+    private Mapper<K> keyMapper;
+
+    /** Value object mapper. */
+    private Mapper<V> valueMapper;
+
     /**
      * Constructor.
+     *
      * @param tbl Table storage.
      * @param schemaReg Schema registry.
      * @param keyMapper Key class mapper.
@@ -49,6 +63,9 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
     public KeyValueViewImpl(InternalTable tbl, SchemaRegistry schemaReg, Mapper<K> keyMapper,
                             Mapper<V> valueMapper, @Nullable Transaction tx) {
         super(tbl, schemaReg, tx);
+
+        this.keyMapper = keyMapper;
+        this.valueMapper = valueMapper;
     }
 
     /** {@inheritDoc} */
@@ -62,11 +79,10 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
         final KVSerializer<K, V> marsh = marshaller();
 
-        Row kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
+        BinaryRow kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
 
         return tbl.get(kRow, tx)
-            .thenApply(this::wrap) // Binary -> schema-aware row
-            .thenApply(marsh::deserializeValue); // row -> deserialized obj.
+            .thenApply(v -> v == null ? null : marsh.deserializeValue(v)); // row -> deserialized obj.
     }
 
     /** {@inheritDoc} */
@@ -91,12 +107,19 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
     /** {@inheritDoc} */
     @Override public void put(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        sync(putAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(key);
+
+        final KVSerializer<K, V> marsh = marshaller();
+
+        BinaryRow kRow = marsh.serialize(key, val); // Convert to portable format to pass TX/storage layer.
+
+        return tbl.upsert(kRow, tx).thenAccept(ignore -> {
+        });
     }
 
     /** {@inheritDoc} */
@@ -239,7 +262,35 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
      * @return Marshaller.
      */
     private KVSerializer<K, V> marshaller() {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        SerializerFactory factory = SerializerFactory.createJavaSerializerFactory();
+
+        return new KVSerializer<K, V>() {
+            Serializer s = factory.create(schemaReg.schema(), keyMapper.getType(), valueMapper.getType());
+
+            @Override public BinaryRow serialize(@NotNull K key, V val) {
+                try {
+                    return new ByteBufferRow(ByteBuffer.wrap(s.serialize(key, val)).order(ByteOrder.LITTLE_ENDIAN));
+                } catch (SerializationException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            @NotNull @Override public K deserializeKey(@NotNull BinaryRow row) {
+                try {
+                    return s.deserializeKey(row.bytes());
+                } catch (SerializationException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            @Nullable @Override public V deserializeValue(@NotNull BinaryRow row) {
+                try {
+                    return s.deserializeValue(row.bytes());
+                } catch (SerializationException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        };
     }
 
     /**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
new file mode 100644
index 0000000..12fc840
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic table operations test.
+ * <p>
+ * TODO: IGNITE-14487 Add bulk operations tests.
+ * TODO: IGNITE-14487 Add async operations tests.
+ */
+public class KeyValueOperationsTest {
+    /**
+     *
+     */
+    @Test
+    public void put() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        Mapper<Long> mapper = new Mapper<>() {
+            @Override public Class<Long> getType() {
+                return Long.class;
+            }
+        };
+
+        KeyValueView<Long, Long> tbl =
+            new KeyValueViewImpl<>(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), mapper, mapper, null);
+
+        assertNull(tbl.get(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 11L);
+
+        assertEquals(11L, tbl.get(1L));
+        assertEquals(11L, tbl.get(1L));
+
+        // Update KV pair.
+        tbl.put(1L, 22L);
+
+        assertEquals(22L, tbl.get(1L));
+        assertEquals( 22L, tbl.get(1L));
+
+        // Remove KV pair.
+        tbl.put(1L, null);
+
+        assertNull(tbl.get(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 33L);
+        assertEquals(33L, tbl.get(1L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void putIfAbsent() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+
+        assertNull(tbl.get(key));
+
+        // Insert new KV pair.
+        assertTrue(tbl.putIfAbsent(key, val));
+
+        assertEqualsValues(schema, val, tbl.get(key));
+        assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+
+        // Update KV pair.
+        assertFalse(tbl.putIfAbsent(key, val2));
+
+        assertEqualsValues(schema, val, tbl.get(key));
+        assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void getAndPut() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+        final Tuple val3 = Tuple.create().set("val", 33L);
+
+        assertNull(tbl.get(key));
+
+        // Insert new tuple.
+        assertNull(tbl.getAndPut(key, val));
+
+        assertEqualsValues(schema, val, tbl.get(key));
+        assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+
+        assertEqualsValues(schema, val, tbl.getAndPut(key, val2));
+        assertEqualsValues(schema, val2, tbl.getAndPut(key, Tuple.create().set("val", 33L)));
+
+        assertEqualsValues(schema, val3, tbl.get(key));
+        assertNull(tbl.get(Tuple.create().set("id", 2L)));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void remove() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple key2 = Tuple.create().set("id", 2L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+
+        // Put KV pair.
+        tbl.put(key, val);
+
+        // Delete existed key.
+        assertEqualsValues(schema, val, tbl.get(key));
+        assertTrue(tbl.remove(key));
+        assertNull(tbl.get(key));
+
+        // Delete already deleted key.
+        assertFalse(tbl.remove(key));
+
+        // Put KV pair.
+        tbl.put(key, val2);
+        assertEqualsValues(schema, val2, tbl.get(key));
+
+        // Delete existed key.
+        assertTrue(tbl.remove(Tuple.create().set("id", 1L)));
+        assertNull(tbl.get(key));
+
+        // Delete not existed key.
+        assertNull(tbl.get(key2));
+        assertFalse(tbl.remove(key2));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void removeExact() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        final KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple key2 = Tuple.create().set("id", 2L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+
+        // Put KV pair.
+        tbl.put(key, val);
+        assertEqualsValues(schema, val, tbl.get(key));
+
+        // Fails to delete KV pair with unexpected value.
+        assertFalse(tbl.remove(key, val2));
+        assertEqualsValues(schema, val, tbl.get(key));
+
+        // Delete KV pair with expected value.
+        assertTrue(tbl.remove(key, val));
+        assertNull(tbl.get(key));
+
+        // Once again.
+        assertFalse(tbl.remove(key, val));
+        assertNull(tbl.get(key));
+
+        // Try to remove non-existed key.
+        assertThrows(Exception.class, () -> tbl.remove(key, null));
+        assertNull(tbl.get(key));
+
+        // Put KV pair.
+        tbl.put(key, val2);
+        assertEqualsValues(schema, val2, tbl.get(key));
+
+        // Check null value ignored.
+        assertThrows(Exception.class, () -> tbl.remove(key, null));
+        assertEqualsValues(schema, val2, tbl.get(key));
+
+        // Delete KV pair with expected value.
+        assertTrue(tbl.remove(key, val2));
+        assertNull(tbl.get(key));
+
+        assertFalse(tbl.remove(key2, val2));
+        assertNull(tbl.get(key2));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void replace() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple key2 = Tuple.create().set("id", 2L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+        final Tuple val3 = Tuple.create().set("val", 33L);
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(key, val));
+        assertNull(tbl.get(key));
+
+        tbl.put(key, val);
+
+        // Replace existed KV pair.
+        assertTrue(tbl.replace(key, val2));
+        assertEqualsValues(schema, val2, tbl.get(key));
+
+        // Remove existed KV pair.
+        assertTrue(tbl.replace(key, null));
+        assertNull(tbl.get(key));
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(key, val3));
+        assertNull(tbl.get(key));
+
+        tbl.put(key, val3);
+        assertEqualsValues(schema, val3, tbl.get(key));
+
+        // Remove non-existed KV pair.
+        assertFalse(tbl.replace(key2, null));
+        assertNull(tbl.get(key2));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void replaceExact() {
+        SchemaDescriptor schema = new SchemaDescriptor(
+            1,
+            new Column[] {new Column("id", NativeTypes.INT64, false)},
+            new Column[] {new Column("val", NativeTypes.INT64, false)}
+        );
+
+        KeyValueView<Tuple, Tuple> tbl =
+            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple key2 = Tuple.create().set("id", 2L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+        final Tuple val3 = Tuple.create().set("val", 33L);
+
+        // Insert KV pair.
+        assertTrue(tbl.replace(key, null, val));
+        assertEqualsValues(schema, val, tbl.get(key));
+        assertNull(tbl.get(key2));
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(key2, val, val2));
+        assertNull(tbl.get(key2));
+
+        // Replace existed KV pair.
+        assertTrue(tbl.replace(key, val, val2));
+        assertEqualsValues(schema, val2, tbl.get(key));
+
+        // Remove existed KV pair.
+        assertTrue(tbl.replace(key, val2, null));
+        assertNull(tbl.get(key));
+
+        // Insert KV pair.
+        assertTrue(tbl.replace(key, null, val3));
+        assertEqualsValues(schema, val3, tbl.get(key));
+
+        // Remove non-existed KV pair.
+        assertTrue(tbl.replace(key2, null, null));
+    }
+
+    /**
+     * Check key columns equality.
+     *
+     * @param schema Schema.
+     * @param expected Expected tuple.
+     * @param actual Actual tuple.
+     */
+    void assertEqualsKeys(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+        int nonNullKey = 0;
+
+        for (int i = 0; i < schema.keyColumns().length(); i++) {
+            final Column col = schema.keyColumns().column(i);
+
+            final Object val1 = expected.value(col.name());
+            final Object val2 = actual.value(col.name());
+
+            assertEquals(val1, val2, "Value columns equality check failed: colIdx=" + col.schemaIndex());
+
+            if (schema.isKeyColumn(i) && val1 != null)
+                nonNullKey++;
+        }
+
+        assertTrue(nonNullKey > 0, "At least one non-null key column must exist.");
+    }
+
+    /**
+     * Check value columns equality.
+     *
+     * @param schema Schema.
+     * @param expected Expected tuple.
+     * @param actual Actual tuple.
+     */
+    void assertEqualsValues(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+        for (int i = 0; i < schema.valueColumns().length(); i++) {
+            final Column col = schema.valueColumns().column(i);
+
+            final Object val1 = expected.value(col.name());
+            final Object val2 = actual.value(col.name());
+
+            assertEquals(val1, val2, "Key columns equality check failed: colIdx=" + col.schemaIndex());
+        }
+    }
+}