You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/10/27 09:42:08 UTC
[ignite-3] 03/04: Implement get/put operations.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-15783
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 2b45cb9f1f9723bb5ce49967f54ada32cc150f0a
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Tue Oct 26 16:00:32 2021 +0300
Implement get/put operations.
---
.../org/apache/ignite/table/mapper/Mapper.java | 7 +
.../marshaller/reflection/JavaSerializer.java | 7 +-
.../internal/schema/marshaller/KVSerializer.java | 8 +-
.../ignite/internal/table/KeyValueViewImpl.java | 63 +++-
.../internal/table/KeyValueOperationsTest.java | 385 +++++++++++++++++++++
5 files changed, 459 insertions(+), 11 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
index 7210378..239df2a 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
@@ -27,6 +27,13 @@ import org.apache.ignite.table.Tuple;
*/
public interface Mapper<R> {
/**
+ * Return mapped type.
+ *
+ * @return Mapped type.
+ */
+ Class<R> getType();
+
+ /**
* Mapper builder.
*
* @param <T> Type.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index c7da543..0c18aaa 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -69,7 +69,9 @@ public class JavaSerializer extends AbstractSerializer {
assert val == null || valClass.isInstance(val);
keyMarsh.writeObject(key, asm);
- valMarsh.writeObject(val, asm);
+
+ if (val != null)
+ valMarsh.writeObject(val, asm);
return asm.toBytes();
}
@@ -127,6 +129,9 @@ public class JavaSerializer extends AbstractSerializer {
/** {@inheritDoc} */
@Override protected Object deserializeValue0(Row row) throws SerializationException {
+ if (!row.hasValue())
+ return null;
+
final Object o = valMarsh.readObject(row);
assert o == null || valClass.isInstance(o);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
index f13e20a..483f93c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.schema.marshaller;
-import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -30,17 +30,17 @@ public interface KVSerializer<K, V> {
* @param val Value object to serialize.
* @return Table row with columns serialized from given key-value pair.
*/
- Row serialize(@NotNull K key, V val);
+ BinaryRow serialize(@NotNull K key, V val);
/**
* @param row Table row.
* @return Deserialized key object.
*/
- @NotNull K deserializeKey(@NotNull Row row);
+ @NotNull K deserializeKey(@NotNull BinaryRow row);
/**
* @param row Table row.
* @return Deserialized value object.
*/
- @Nullable V deserializeValue(@NotNull Row row);
+ @Nullable V deserializeValue(@NotNull BinaryRow row);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 44ac174..4193198 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -18,15 +18,22 @@
package org.apache.ignite.internal.table;
import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.KVSerializer;
+import org.apache.ignite.internal.schema.marshaller.SerializationException;
+import org.apache.ignite.internal.schema.marshaller.Serializer;
+import org.apache.ignite.internal.schema.marshaller.SerializerFactory;
import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.Mapper;
@@ -38,8 +45,15 @@ import org.jetbrains.annotations.Nullable;
* Key-value view implementation.
*/
public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValueView<K, V> {
+ /** Key object mapper. */
+ private Mapper<K> keyMapper;
+
+ /** Value object mapper. */
+ private Mapper<V> valueMapper;
+
/**
* Constructor.
+ *
* @param tbl Table storage.
* @param schemaReg Schema registry.
* @param keyMapper Key class mapper.
@@ -49,6 +63,9 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
public KeyValueViewImpl(InternalTable tbl, SchemaRegistry schemaReg, Mapper<K> keyMapper,
Mapper<V> valueMapper, @Nullable Transaction tx) {
super(tbl, schemaReg, tx);
+
+ this.keyMapper = keyMapper;
+ this.valueMapper = valueMapper;
}
/** {@inheritDoc} */
@@ -62,11 +79,10 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
final KVSerializer<K, V> marsh = marshaller();
- Row kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
+ BinaryRow kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
return tbl.get(kRow, tx)
- .thenApply(this::wrap) // Binary -> schema-aware row
- .thenApply(marsh::deserializeValue); // row -> deserialized obj.
+ .thenApply(v -> v == null ? null : marsh.deserializeValue(v)); // row -> deserialized obj.
}
/** {@inheritDoc} */
@@ -91,12 +107,19 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
/** {@inheritDoc} */
@Override public void put(@NotNull K key, V val) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ sync(putAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ Objects.requireNonNull(key);
+
+ final KVSerializer<K, V> marsh = marshaller();
+
+ BinaryRow kRow = marsh.serialize(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.upsert(kRow, tx).thenAccept(ignore -> {
+ });
}
/** {@inheritDoc} */
@@ -239,7 +262,35 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
* @return Marshaller.
*/
private KVSerializer<K, V> marshaller() {
- throw new UnsupportedOperationException("Not implemented yet.");
+ SerializerFactory factory = SerializerFactory.createJavaSerializerFactory();
+
+ return new KVSerializer<K, V>() {
+ Serializer s = factory.create(schemaReg.schema(), keyMapper.getType(), valueMapper.getType());
+
+ @Override public BinaryRow serialize(@NotNull K key, V val) {
+ try {
+ return new ByteBufferRow(ByteBuffer.wrap(s.serialize(key, val)).order(ByteOrder.LITTLE_ENDIAN));
+ } catch (SerializationException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ @NotNull @Override public K deserializeKey(@NotNull BinaryRow row) {
+ try {
+ return s.deserializeKey(row.bytes());
+ } catch (SerializationException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ @Nullable @Override public V deserializeValue(@NotNull BinaryRow row) {
+ try {
+ return s.deserializeValue(row.bytes());
+ } catch (SerializationException e) {
+ throw new IgniteException(e);
+ }
+ }
+ };
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
new file mode 100644
index 0000000..12fc840
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic table operations test.
+ * <p>
+ * TODO: IGNITE-14487 Add bulk operations tests.
+ * TODO: IGNITE-14487 Add async operations tests.
+ */
+public class KeyValueOperationsTest {
+ /**
+ *
+ */
+ @Test
+ public void put() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ Mapper<Long> mapper = new Mapper<>() {
+ @Override public Class<Long> getType() {
+ return Long.class;
+ }
+ };
+
+ KeyValueView<Long, Long> tbl =
+ new KeyValueViewImpl<>(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), mapper, mapper, null);
+
+ assertNull(tbl.get(1L));
+
+ // Put KV pair.
+ tbl.put(1L, 11L);
+
+ assertEquals(11L, tbl.get(1L));
+ assertEquals(11L, tbl.get(1L));
+
+ // Update KV pair.
+ tbl.put(1L, 22L);
+
+ assertEquals(22L, tbl.get(1L));
+ assertEquals( 22L, tbl.get(1L));
+
+ // Remove KV pair.
+ tbl.put(1L, null);
+
+ assertNull(tbl.get(1L));
+
+ // Put KV pair.
+ tbl.put(1L, 33L);
+ assertEquals(33L, tbl.get(1L));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void putIfAbsent() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+
+ assertNull(tbl.get(key));
+
+ // Insert new KV pair.
+ assertTrue(tbl.putIfAbsent(key, val));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+
+ // Update KV pair.
+ assertFalse(tbl.putIfAbsent(key, val2));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void getAndPut() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+ final Tuple val3 = Tuple.create().set("val", 33L);
+
+ assertNull(tbl.get(key));
+
+ // Insert new tuple.
+ assertNull(tbl.getAndPut(key, val));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(Tuple.create().set("id", 1L)));
+
+ assertEqualsValues(schema, val, tbl.getAndPut(key, val2));
+ assertEqualsValues(schema, val2, tbl.getAndPut(key, Tuple.create().set("val", 33L)));
+
+ assertEqualsValues(schema, val3, tbl.get(key));
+ assertNull(tbl.get(Tuple.create().set("id", 2L)));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void remove() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple key2 = Tuple.create().set("id", 2L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+
+ // Put KV pair.
+ tbl.put(key, val);
+
+ // Delete existed key.
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertTrue(tbl.remove(key));
+ assertNull(tbl.get(key));
+
+ // Delete already deleted key.
+ assertFalse(tbl.remove(key));
+
+ // Put KV pair.
+ tbl.put(key, val2);
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Delete existed key.
+ assertTrue(tbl.remove(Tuple.create().set("id", 1L)));
+ assertNull(tbl.get(key));
+
+ // Delete not existed key.
+ assertNull(tbl.get(key2));
+ assertFalse(tbl.remove(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void removeExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ final KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple key2 = Tuple.create().set("id", 2L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+
+ // Put KV pair.
+ tbl.put(key, val);
+ assertEqualsValues(schema, val, tbl.get(key));
+
+ // Fails to delete KV pair with unexpected value.
+ assertFalse(tbl.remove(key, val2));
+ assertEqualsValues(schema, val, tbl.get(key));
+
+ // Delete KV pair with expected value.
+ assertTrue(tbl.remove(key, val));
+ assertNull(tbl.get(key));
+
+ // Once again.
+ assertFalse(tbl.remove(key, val));
+ assertNull(tbl.get(key));
+
+ // Try to remove non-existed key.
+ assertThrows(Exception.class, () -> tbl.remove(key, null));
+ assertNull(tbl.get(key));
+
+ // Put KV pair.
+ tbl.put(key, val2);
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Check null value ignored.
+ assertThrows(Exception.class, () -> tbl.remove(key, null));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Delete KV pair with expected value.
+ assertTrue(tbl.remove(key, val2));
+ assertNull(tbl.get(key));
+
+ assertFalse(tbl.remove(key2, val2));
+ assertNull(tbl.get(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void replace() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple key2 = Tuple.create().set("id", 2L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+ final Tuple val3 = Tuple.create().set("val", 33L);
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key, val));
+ assertNull(tbl.get(key));
+
+ tbl.put(key, val);
+
+ // Replace existed KV pair.
+ assertTrue(tbl.replace(key, val2));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Remove existed KV pair.
+ assertTrue(tbl.replace(key, null));
+ assertNull(tbl.get(key));
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key, val3));
+ assertNull(tbl.get(key));
+
+ tbl.put(key, val3);
+ assertEqualsValues(schema, val3, tbl.get(key));
+
+ // Remove non-existed KV pair.
+ assertFalse(tbl.replace(key2, null));
+ assertNull(tbl.get(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void replaceExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeTypes.INT64, false)},
+ new Column[] {new Column("val", NativeTypes.INT64, false)}
+ );
+
+ KeyValueView<Tuple, Tuple> tbl =
+ new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+
+ final Tuple key = Tuple.create().set("id", 1L);
+ final Tuple key2 = Tuple.create().set("id", 2L);
+ final Tuple val = Tuple.create().set("val", 11L);
+ final Tuple val2 = Tuple.create().set("val", 22L);
+ final Tuple val3 = Tuple.create().set("val", 33L);
+
+ // Insert KV pair.
+ assertTrue(tbl.replace(key, null, val));
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertNull(tbl.get(key2));
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key2, val, val2));
+ assertNull(tbl.get(key2));
+
+ // Replace existed KV pair.
+ assertTrue(tbl.replace(key, val, val2));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Remove existed KV pair.
+ assertTrue(tbl.replace(key, val2, null));
+ assertNull(tbl.get(key));
+
+ // Insert KV pair.
+ assertTrue(tbl.replace(key, null, val3));
+ assertEqualsValues(schema, val3, tbl.get(key));
+
+ // Remove non-existed KV pair.
+ assertTrue(tbl.replace(key2, null, null));
+ }
+
+ /**
+ * Check key columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsKeys(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ int nonNullKey = 0;
+
+ for (int i = 0; i < schema.keyColumns().length(); i++) {
+ final Column col = schema.keyColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ assertEquals(val1, val2, "Value columns equality check failed: colIdx=" + col.schemaIndex());
+
+ if (schema.isKeyColumn(i) && val1 != null)
+ nonNullKey++;
+ }
+
+ assertTrue(nonNullKey > 0, "At least one non-null key column must exist.");
+ }
+
+ /**
+ * Check value columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsValues(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ for (int i = 0; i < schema.valueColumns().length(); i++) {
+ final Column col = schema.valueColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ assertEquals(val1, val2, "Key columns equality check failed: colIdx=" + col.schemaIndex());
+ }
+ }
+}