You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/11/01 14:42:14 UTC

[ignite-3] branch main updated: IGNITE-14291: Implement KeyValueView API (#416)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e79100  IGNITE-14291: Implement KeyValueView API (#416)
8e79100 is described below

commit 8e79100da98eb61bec57c59f339ef5dd8c3f0111
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Mon Nov 1 17:42:06 2021 +0300

    IGNITE-14291: Implement KeyValueView API (#416)
---
 .../main/java/org/apache/ignite/table/Table.java   |   8 +-
 .../org/apache/ignite/table/mapper/KeyMapper.java  |  26 --
 .../table/mapper/{ValueMapper.java => Mapper.java} |  43 +--
 .../org/apache/ignite/table/mapper/Mappers.java    |  12 +-
 .../apache/ignite/table/mapper/RecordMapper.java   |  61 -----
 .../ignite/internal/client/table/ClientTable.java  |   8 +-
 .../schema/marshaller/AbstractSerializer.java      |  71 +----
 .../internal/schema/marshaller/Serializer.java     |  24 +-
 .../schema/marshaller/SerializerFactory.java       |   6 +-
 .../marshaller/asm/AsmSerializerGenerator.java     |  54 ++--
 .../marshaller/reflection/JavaSerializer.java      |  70 +++--
 .../schema/marshaller/reflection/Marshaller.java   |   8 +-
 .../ignite/internal/schema/row/VarTableFormat.java |   6 +-
 .../benchmarks/SerializerBenchmarkTest.java        |   8 +-
 .../schema/marshaller/JavaSerializerTest.java      |  36 +--
 .../{KVSerializer.java => KVMarshaller.java}       |  30 +-
 .../ignite/internal/table/KeyValueViewImpl.java    | 204 ++++++++++----
 .../ignite/internal/table/RecordViewImpl.java      |   4 +-
 .../apache/ignite/internal/table/TableImpl.java    |   8 +-
 .../table/KeyValueBinaryViewOperationsTest.java    | 111 ++++----
 .../internal/table/KeyValueOperationsTest.java     | 301 +++++++++++++++++++++
 21 files changed, 684 insertions(+), 415 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/Table.java b/modules/api/src/main/java/org/apache/ignite/table/Table.java
index cd5be9e..14e6f75 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Table.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Table.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.table;
 
-import org.apache.ignite.table.mapper.KeyMapper;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.table.mapper.Mappers;
-import org.apache.ignite.table.mapper.RecordMapper;
-import org.apache.ignite.table.mapper.ValueMapper;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -47,7 +45,7 @@ public interface Table {
      * @param <R> Record type.
      * @return Table record view.
      */
-    <R> RecordView<R> recordView(RecordMapper<R> recMapper);
+    <R> RecordView<R> recordView(Mapper<R> recMapper);
 
     /**
      * Creates record view of table regarding the binary object concept.
@@ -65,7 +63,7 @@ public interface Table {
      * @param <V> Value type.
      * @return Table key-value view.
      */
-    <K, V> KeyValueView<K, V> keyValueView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper);
+    <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, Mapper<V> valMapper);
 
     /**
      * Creates key-value view of table regarding the binary object concept.
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/KeyMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/KeyMapper.java
deleted file mode 100644
index 8b48507..0000000
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/KeyMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.table.mapper;
-
-/**
- * Key mapper interface.
- *
- * @param <K> Key type.
- */
-public interface KeyMapper<K> {
-}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/ValueMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
similarity index 71%
rename from modules/api/src/main/java/org/apache/ignite/table/mapper/ValueMapper.java
rename to modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
index 2a9e900..8bde79a 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/ValueMapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
@@ -21,25 +21,24 @@ import java.util.function.Function;
 import org.apache.ignite.table.Tuple;
 
 /**
- * Value mapper interface.
+ * Mapper interface defines methods that are required for a marshaller to map class field names to table columns.
  *
- * @param <V> Value type.
+ * @param <T> Mapped type.
  */
-public interface ValueMapper<V> {
+public interface Mapper<T> {
     /**
-     * Value mapper builder.
+     * Return mapped type.
      *
-     * @param <V> Value type.
+     * @return Mapped type.
      */
-    public interface Builder<V> {
-        /**
-         * Sets a target class to deserialize to.
-         *
-         * @param targetClass Target class.
-         * @return {@code this} for chaining.
-         */
-        public Builder<V> deserializeTo(Class<?> targetClass);
+    Class<T> getType();
 
+    /**
+     * Mapper builder.
+     *
+     * @param <T> Mapped type.
+     */
+    interface Builder<T> {
         /**
          * Map a field to a type of given class.
          *
@@ -47,23 +46,31 @@ public interface ValueMapper<V> {
          * @param targetClass Target class.
          * @return {@code this} for chaining.
          */
-        public Builder<V> map(String fieldName, Class<?> targetClass);
+        Builder<T> map(String fieldName, Class<?> targetClass);
 
         /**
          * Adds a functional mapping for a field,
          * the result depends on function call for every particular row.
          *
          * @param fieldName Field name.
-         * @param mapperFunction Mapper function.
+         * @param mappingFunction Mapper function.
+         * @return {@code this} for chaining.
+         */
+        Builder<T> map(String fieldName, Function<Tuple, Object> mappingFunction);
+
+        /**
+         * Sets a target class to deserialize to.
+         *
+         * @param targetClass Target class.
          * @return {@code this} for chaining.
          */
-        public Builder<V> map(String fieldName, Function<Tuple, Object> mapperFunction);
+        Builder<T> deserializeTo(Class<?> targetClass);
 
         /**
-         * Builds value mapper.
+         * Builds mapper.
          *
          * @return Mapper.
          */
-        public ValueMapper<V> build();
+        Mapper<T> build();
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mappers.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mappers.java
index 557efff..c4bc6cc 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mappers.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mappers.java
@@ -28,7 +28,7 @@ public final class Mappers {
      * @param <K> Key type.
      * @return Mapper for key.
      */
-    public static <K> KeyMapper<K> ofKeyClass(Class<K> cls) {
+    public static <K> Mapper<K> ofKeyClass(Class<K> cls) {
         return null;
     }
 
@@ -39,7 +39,7 @@ public final class Mappers {
      * @param <V> Value type.
      * @return Mapper for value.
      */
-    public static <V> ValueMapper<V> ofValueClass(Class<V> cls) {
+    public static <V> Mapper<V> ofValueClass(Class<V> cls) {
         return null;
     }
 
@@ -50,7 +50,7 @@ public final class Mappers {
      * @param <V> Value type.
      * @return Mapper builder for value.
      */
-    public static <V> ValueMapper.Builder<V> ofValueClassBuilder(Class<V> cls) {
+    public static <V> Mapper.Builder<V> ofValueClassBuilder(Class<V> cls) {
         return null;
     }
 
@@ -60,7 +60,7 @@ public final class Mappers {
      * @param <R> Record type.
      * @return Identity key mapper.
      */
-    public static <R> KeyMapper<R> identity() {
+    public static <R> Mapper<R> identity() {
         return null;
     }
 
@@ -71,7 +71,7 @@ public final class Mappers {
      * @param <R> Record type.
      * @return Mapper builder for record.
      */
-    public static <R> RecordMapper<R> ofRecordClass(Class<R> cls) {
+    public static <R> Mapper<R> ofRecordClass(Class<R> cls) {
         return null;
     }
 
@@ -82,7 +82,7 @@ public final class Mappers {
      * @param <R> Record type.
      * @return Mapper builder for record.
      */
-    public static <R> RecordMapper.Builder<R> ofRecordClassBuilder(Class<R> cls) {
+    public static <R> Mapper.Builder<R> ofRecordClassBuilder(Class<R> cls) {
         return null;
     }
 
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/RecordMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/RecordMapper.java
deleted file mode 100644
index fb7b52f..0000000
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/RecordMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.table.mapper;
-
-import java.util.function.Function;
-import org.apache.ignite.table.Tuple;
-
-/**
- * Record mapper interface.
- *
- * @param <R> Record type.
- */
-public interface RecordMapper<R> {
-    /**
-     * Record mapper builder.
-     *
-     * @param <R> Record type.
-     */
-    public interface Builder<R> {
-        /**
-         * Map a field to a type of given class.
-         *
-         * @param fieldName Field name.
-         * @param targetClass Target class.
-         * @return {@code this} for chaining.
-         */
-        public Builder<R> map(String fieldName, Class<?> targetClass);
-
-        /**
-         * Adds a functional mapping for a field,
-         * the result depends on function call for every particular row.
-         *
-         * @param fieldName Field name.
-         * @param mappingFunction Mapper function.
-         * @return {@code this} for chaining.
-         */
-        public Builder<R> map(String fieldName, Function<Tuple, Object> mappingFunction);
-
-        /**
-         * Builds record mapper.
-         *
-         * @return Mapper.
-         */
-        public RecordMapper<R> build();
-    }
-}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 188f775..faae660 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -39,9 +39,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.mapper.KeyMapper;
-import org.apache.ignite.table.mapper.RecordMapper;
-import org.apache.ignite.table.mapper.ValueMapper;
+import org.apache.ignite.table.mapper.Mapper;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.msgpack.core.MessageFormat;
@@ -100,12 +98,12 @@ public class ClientTable implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
+    @Override public <R> RecordView<R> recordView(Mapper<R> recMapper) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> KeyValueView<K, V> keyValueView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
+    @Override public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, Mapper<V> valMapper) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
index 31e8f12..f6a8f9e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
@@ -17,13 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
-import java.util.Objects;
-import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.util.Pair;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Base serializer class.
@@ -42,68 +36,7 @@ public abstract class AbstractSerializer implements Serializer {
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] serialize(Object key, Object val) throws SerializationException {
-        final RowAssembler assembler = createAssembler(Objects.requireNonNull(key), val);
-
-        return serialize0(assembler, key, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K> K deserializeKey(byte[] data) throws SerializationException {
-        final Row row = new Row(schema, new ByteBufferRow(data));
-
-        return (K)deserializeKey0(row);
+    @Override public SchemaDescriptor schema() {
+        return schema;
     }
-
-    /** {@inheritDoc} */
-    @Override public <V> V deserializeValue(byte[] data) throws SerializationException {
-        final Row row = new Row(schema, new ByteBufferRow(data));
-
-        return (V)deserializeValue0(row);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> Pair<K, V> deserialize(byte[] data) throws SerializationException {
-        final Row row = new Row(schema, new ByteBufferRow(data));
-
-        return new Pair<>((K)deserializeKey0(row), (V)deserializeValue0(row));
-    }
-
-    /**
-     * Row assembler factory method.
-     *
-     * @param key Key object.
-     * @param val Value object.
-     * @return Row assembler.
-     */
-    protected abstract RowAssembler createAssembler(Object key, @Nullable Object val);
-
-    /**
-     * Internal serialization method.
-     *
-     * @param asm Row assembler.
-     * @param key Key object.
-     * @param val Value object.
-     * @return Serialized pair.
-     * @throws SerializationException If failed.
-     */
-    protected abstract byte[] serialize0(RowAssembler asm, Object key, Object val) throws SerializationException;
-
-    /**
-     * Extract key object from row.
-     *
-     * @param row Row.
-     * @return Deserialized key object.
-     * @throws SerializationException If failed.
-     */
-    protected abstract Object deserializeKey0(Row row) throws SerializationException;
-
-    /**
-     * Extract value object from row.
-     *
-     * @param row Row.
-     * @return Deserialized value object.
-     * @throws SerializationException If failed.
-     */
-    protected abstract Object deserializeValue0(Row row) throws SerializationException;
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/Serializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/Serializer.java
index d9ad99d..3b3eedb 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/Serializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/Serializer.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.util.Pair;
 
 /**
@@ -28,33 +31,38 @@ public interface Serializer {
      *
      * @param key Key object.
      * @param val Value object.
-     * @return Serialized key-value pair.
+     * @return Binary row.
      * @throws SerializationException If serialization failed.
      */
-    byte[] serialize(Object key, Object val) throws SerializationException;
+    BinaryRow serialize(Object key, Object val) throws SerializationException;
 
     /**
-     * @param data Key bytes.
+     * @param row Row.
      * @param <K> Key object type.
      * @return Key object.
      * @throws SerializationException If deserialization failed.
      */
-    <K> K deserializeKey(byte[] data) throws SerializationException;
+    <K> K deserializeKey(Row row) throws SerializationException;
 
     /**
-     * @param data Value bytes.
+     * @param row Row.
      * @param <V> Value object type.
      * @return Value object.
      * @throws SerializationException If deserialization failed.
      */
-    <V> V deserializeValue(byte[] data) throws SerializationException;
+    <V> V deserializeValue(Row row) throws SerializationException;
 
     /**
-     * @param data Row bytes.
+     * @param row Row.
      * @param <K> Key object type.
      * @param <V> Value object type.
      * @return Key-value pair.
      * @throws SerializationException If deserialization failed.
      */
-    <K, V> Pair<K,V> deserialize(byte[] data) throws SerializationException;
+    <K, V> Pair<K, V> deserialize(Row row) throws SerializationException;
+
+    /**
+     * @return Schema.
+     */
+    SchemaDescriptor schema();
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializerFactory.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializerFactory.java
index bbc173c..ca601d5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializerFactory.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializerFactory.java
@@ -29,14 +29,14 @@ public interface SerializerFactory {
     /**
      * @return Serializer factory back by code generator.
      */
-    public static SerializerFactory createGeneratedSerializerFactory() {
+    static SerializerFactory createGeneratedSerializerFactory() {
         return new AsmSerializerGenerator();
     }
 
     /**
      * @return Reflection-based serializer factory.
      */
-    public static SerializerFactory createJavaSerializerFactory() {
+    static SerializerFactory createJavaSerializerFactory() {
         return new JavaSerializerFactory();
     }
 
@@ -48,5 +48,5 @@ public interface SerializerFactory {
      * @param valClass Value class.
      * @return Serializer.
      */
-    public Serializer create(SchemaDescriptor schema, Class<?> keyClass, Class<?> valClass);
+    Serializer create(SchemaDescriptor schema, Class<?> keyClass, Class<?> valClass);
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
index 027bf68..0eb22b0 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.schema.marshaller.asm;
 
 import java.io.StringWriter;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.processing.Generated;
@@ -34,12 +33,11 @@ import com.facebook.presto.bytecode.Scope;
 import com.facebook.presto.bytecode.Variable;
 import com.facebook.presto.bytecode.control.IfStatement;
 import com.facebook.presto.bytecode.control.TryCatch;
-import com.facebook.presto.bytecode.expression.BytecodeExpression;
 import com.facebook.presto.bytecode.expression.BytecodeExpressions;
 import jdk.jfr.Experimental;
-import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.AbstractSerializer;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
@@ -233,13 +231,12 @@ public class AsmSerializerGenerator implements SerializerFactory {
         MarshallerCodeGenerator valMarsh
     ) {
         final MethodDefinition methodDef = classDef.declareMethod(
-            EnumSet.of(Access.PROTECTED),
+            EnumSet.of(Access.PRIVATE),
             "createAssembler",
             ParameterizedType.type(RowAssembler.class),
             Parameter.arg("key", Object.class),
             Parameter.arg("val", Object.class)
         );
-        methodDef.declareAnnotation(Override.class);
 
         final Scope scope = methodDef.getScope();
         final BytecodeBlock body = methodDef.getBody();
@@ -306,19 +303,23 @@ public class AsmSerializerGenerator implements SerializerFactory {
         MarshallerCodeGenerator valMarsh
     ) {
         final MethodDefinition methodDef = classDef.declareMethod(
-            EnumSet.of(Access.PROTECTED),
-            "serialize0",
-            ParameterizedType.type(byte[].class),
-            Parameter.arg("asm", RowAssembler.class),
+            EnumSet.of(Access.PUBLIC),
+            "serialize",
+            ParameterizedType.type(BinaryRow.class),
             Parameter.arg("key", Object.class),
             Parameter.arg("val", Object.class)
         ).addException(SerializationException.class);
 
         methodDef.declareAnnotation(Override.class);
 
-        final Variable asm = methodDef.getScope().getVariable("asm");
+        final Variable asm = methodDef.getScope().createTempVariable(RowAssembler.class);
 
-        methodDef.getBody().append(new IfStatement().condition(BytecodeExpressions.isNull(asm)).ifTrue(
+        methodDef.getBody()
+            .append(asm.set(methodDef.getScope().getThis().invoke("createAssembler",
+                RowAssembler.class,
+                methodDef.getScope().getVariable("key"),
+                methodDef.getScope().getVariable("val"))))
+            .append(new IfStatement().condition(BytecodeExpressions.isNull(asm)).ifTrue(
             new BytecodeBlock()
                 .append(BytecodeExpressions.newInstance(IgniteInternalException.class, BytecodeExpressions.constantString("ASM can't be null.")))
                 .throwObject()
@@ -337,7 +338,8 @@ public class AsmSerializerGenerator implements SerializerFactory {
                     asm,
                     methodDef.getScope().getVariable("val"))
             )
-            .append(asm.invoke("toBytes", byte[].class))
+            .append(BytecodeExpressions.newInstance(ByteBufferRow.class,
+                asm.invoke("toBytes", byte[].class)))
             .retObject();
 
         final Variable ex = methodDef.getScope().createTempVariable(Throwable.class);
@@ -360,8 +362,8 @@ public class AsmSerializerGenerator implements SerializerFactory {
      */
     private void generateDeserializeKeyMethod(ClassDefinition classDef, MarshallerCodeGenerator keyMarsh) {
         final MethodDefinition methodDef = classDef.declareMethod(
-            EnumSet.of(Access.PROTECTED),
-            "deserializeKey0",
+            EnumSet.of(Access.PUBLIC),
+            "deserializeKey",
             ParameterizedType.type(Object.class),
             Parameter.arg("row", Row.class)
         ).addException(SerializationException.class);
@@ -388,8 +390,8 @@ public class AsmSerializerGenerator implements SerializerFactory {
      */
     private void generateDeserializeValueMethod(ClassDefinition classDef, MarshallerCodeGenerator valMarsh) {
         final MethodDefinition methodDef = classDef.declareMethod(
-            EnumSet.of(Access.PROTECTED),
-            "deserializeValue0",
+            EnumSet.of(Access.PUBLIC),
+            "deserializeValue",
             ParameterizedType.type(Object.class),
             Parameter.arg("row", Row.class)
         ).addException(SerializationException.class);
@@ -411,24 +413,6 @@ public class AsmSerializerGenerator implements SerializerFactory {
     }
 
     /**
-     * Generates column size expression.
-     *
-     * @param obj Target object.
-     * @param cols columns.
-     * @param colIdx Column index.
-     * @return Expression.
-     */
-    private BytecodeExpression getColumnValueSize(Variable obj, Variable cols, int colIdx) {
-        return BytecodeExpressions.invokeStatic(MarshallerUtil.class, "getValueSize",
-            int.class,
-            Arrays.asList(Object.class, NativeType.class),
-            obj,
-            cols.invoke("column", Column.class, BytecodeExpressions.constantInt(colIdx))
-                .invoke("type", NativeType.class)
-        );
-    }
-
-    /**
      * Resolves current classloader.
      *
      * @return Classloader.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
index 94bfd0a..7ef1e51 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/JavaSerializer.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
+import java.util.Objects;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Columns;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.AbstractSerializer;
 import org.apache.ignite.internal.schema.marshaller.SerializationException;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.util.Pair;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.schema.marshaller.MarshallerUtil.getValueSize;
@@ -55,23 +59,49 @@ public class JavaSerializer extends AbstractSerializer {
         this.keyClass = keyClass;
         this.valClass = valClass;
 
-        keyMarsh = Marshaller.createMarshaller(schema.keyColumns(), 0, keyClass);
-        valMarsh = Marshaller.createMarshaller(schema.valueColumns(), schema.keyColumns().length(), valClass);
+        keyMarsh = Marshaller.createMarshaller(schema.keyColumns(), keyClass);
+        valMarsh = Marshaller.createMarshaller(schema.valueColumns(), valClass);
     }
 
     /** {@inheritDoc} */
-    @Override protected byte[] serialize0(
-        RowAssembler asm,
-        Object key,
-        @Nullable Object val
-    ) throws SerializationException {
+    @Override public BinaryRow serialize(Object key, @Nullable Object val) throws SerializationException {
         assert keyClass.isInstance(key);
         assert val == null || valClass.isInstance(val);
 
+        final RowAssembler asm = createAssembler(Objects.requireNonNull(key), val);
+
         keyMarsh.writeObject(key, asm);
-        valMarsh.writeObject(val, asm);
 
-        return asm.toBytes();
+        if (val != null)
+            valMarsh.writeObject(val, asm);
+
+        return new ByteBufferRow(asm.toBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> K deserializeKey(Row row) throws SerializationException {
+        final Object o = keyMarsh.readObject(row);
+
+        assert keyClass.isInstance(o);
+
+        return (K)o;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V> V deserializeValue(Row row) throws SerializationException {
+        if (!row.hasValue())
+            return null;
+
+        final Object o = valMarsh.readObject(row);
+
+        assert o == null || valClass.isInstance(o);
+
+        return (V)o;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> Pair<K, V> deserialize(Row row) throws SerializationException {
+        return new Pair<>(deserializeKey(row), deserializeValue(row));
     }
 
     /**
@@ -81,7 +111,7 @@ public class JavaSerializer extends AbstractSerializer {
      * @param val Value object.
      * @return Row assembler.
      */
-    @Override protected RowAssembler createAssembler(Object key, Object val) {
+    private RowAssembler createAssembler(Object key, Object val) {
         ObjectStatistic keyStat = collectObjectStats(schema.keyColumns(), keyMarsh, key);
         ObjectStatistic valStat = collectObjectStats(schema.valueColumns(), valMarsh, val);
 
@@ -116,30 +146,12 @@ public class JavaSerializer extends AbstractSerializer {
         return new ObjectStatistic(cnt, size);
     }
 
-    /** {@inheritDoc} */
-    @Override protected Object deserializeKey0(Row row) throws SerializationException {
-        final Object o = keyMarsh.readObject(row);
-
-        assert keyClass.isInstance(o);
-
-        return o;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Object deserializeValue0(Row row) throws SerializationException {
-        final Object o = valMarsh.readObject(row);
-
-        assert o == null || valClass.isInstance(o);
-
-        return o;
-    }
-
     /**
      * Object statistic.
      */
     private static class ObjectStatistic {
         /** Cached zero statistics. */
-        static final ObjectStatistic ZERO_VARLEN_STATISTICS = new ObjectStatistic(0,0);
+        static final ObjectStatistic ZERO_VARLEN_STATISTICS = new ObjectStatistic(0, 0);
 
         /** Non-null columns of varlen type. */
         int nonNullCols;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
index e43433d..f1eabbd 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
@@ -37,11 +37,10 @@ class Marshaller {
      * Creates marshaller for class.
      *
      * @param cols Columns.
-     * @param firstColId First column position in schema.
      * @param aClass Type.
      * @return Marshaller.
      */
-    static Marshaller createMarshaller(Columns cols, int firstColId, Class<? extends Object> aClass) {
+    static Marshaller createMarshaller(Columns cols, Class<? extends Object> aClass) {
         final BinaryMode mode = MarshallerUtil.mode(aClass);
 
         if (mode != null) {
@@ -51,7 +50,7 @@ class Marshaller {
             assert mode.typeSpec() == col.type().spec() : "Target type is not compatible.";
             assert !aClass.isPrimitive() : "Non-nullable types are not allowed.";
 
-            return new Marshaller(FieldAccessor.createIdentityAccessor(col, firstColId, mode));
+            return new Marshaller(FieldAccessor.createIdentityAccessor(col, col.schemaIndex(), mode));
         }
 
         FieldAccessor[] fieldAccessors = new FieldAccessor[cols.length()];
@@ -60,8 +59,7 @@ class Marshaller {
         for (int i = 0; i < cols.length(); i++) {
             final Column col = cols.column(i);
 
-            final int colIdx = firstColId + i; /* Absolute column idx in schema. */
-            fieldAccessors[i] = FieldAccessor.create(aClass, col, colIdx);
+            fieldAccessors[i] = FieldAccessor.create(aClass, col, col.schemaIndex());
         }
 
         return new Marshaller(new ObjectFactory<>(aClass), fieldAccessors);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/VarTableFormat.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/VarTableFormat.java
index cbf8361..be0b601 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/VarTableFormat.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/VarTableFormat.java
@@ -54,12 +54,12 @@ abstract class VarTableFormat {
      * to write vartable in a compact way.
      *
      * @param payloadLen Payload size in bytes.
-     * @param valVartblLen
+     * @param vartblSize Number of items in the vartable.
      * @return Vartable format helper.
      */
-    static VarTableFormat format(int payloadLen, int valVartblLen) {
+    static VarTableFormat format(int payloadLen, int vartblSize) {
         if (payloadLen > 0) {
-            if (payloadLen < 256 && valVartblLen < 256)
+            if (payloadLen < 256 && vartblSize < 256)
                 return TINY;
 
             if (payloadLen < 64 * Constants.KiB)
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/benchmarks/SerializerBenchmarkTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/benchmarks/SerializerBenchmarkTest.java
index 716b064..488b12a 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/benchmarks/SerializerBenchmarkTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/benchmarks/SerializerBenchmarkTest.java
@@ -30,10 +30,12 @@ import com.facebook.presto.bytecode.MethodDefinition;
 import com.facebook.presto.bytecode.ParameterizedType;
 import com.facebook.presto.bytecode.Variable;
 import com.facebook.presto.bytecode.expression.BytecodeExpressions;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.Serializer;
 import org.apache.ignite.internal.schema.marshaller.SerializerFactory;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.util.Factory;
 import org.apache.ignite.internal.util.ObjectFactory;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -134,10 +136,10 @@ public class SerializerBenchmarkTest {
         Long key = rnd.nextLong();
 
         Object val = objectFactory.create();
-        byte[] bytes = serializer.serialize(key, val);
+        BinaryRow row = serializer.serialize(key, val);
 
-        Object restoredKey = serializer.deserializeKey(bytes);
-        Object restoredVal = serializer.deserializeValue(bytes);
+        Object restoredKey = serializer.deserializeKey(new Row(serializer.schema(), row));
+        Object restoredVal = serializer.deserializeValue(new Row(serializer.schema(), row));
 
         bh.consume(restoredVal);
         bh.consume(restoredKey);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
index 8996fe4..ac96ea4 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
@@ -37,6 +37,7 @@ import com.facebook.presto.bytecode.MethodDefinition;
 import com.facebook.presto.bytecode.ParameterizedType;
 import com.facebook.presto.bytecode.Variable;
 import com.facebook.presto.bytecode.expression.BytecodeExpressions;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.TestUtils;
 import org.apache.ignite.internal.schema.marshaller.asm.AsmSerializerGenerator;
 import org.apache.ignite.internal.schema.marshaller.reflection.JavaSerializerFactory;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.ObjectFactory;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -168,11 +170,11 @@ public class JavaSerializerTest {
 
         Serializer serializer = factory.create(schema, key.getClass(), val.getClass());
 
-        byte[] bytes = serializer.serialize(key, val);
+        BinaryRow row = serializer.serialize(key, val);
 
         // Try different order.
-        Object restoredVal = serializer.deserializeValue(bytes);
-        Object restoredKey = serializer.deserializeKey(bytes);
+        Object restoredVal = serializer.deserializeValue(new Row(schema, row));
+        Object restoredKey = serializer.deserializeKey(new Row(schema, row));
 
         assertTrue(key.getClass().isInstance(restoredKey));
         assertTrue(val.getClass().isInstance(restoredVal));
@@ -248,10 +250,10 @@ public class JavaSerializerTest {
 
         Serializer serializer = factory.create(schema, key.getClass(), val.getClass());
 
-        byte[] bytes = serializer.serialize(key, val);
+        BinaryRow row = serializer.serialize(key, val);
 
-        Object key1 = serializer.deserializeKey(bytes);
-        Object val1 = serializer.deserializeValue(bytes);
+        Object key1 = serializer.deserializeKey(new Row(schema, row));
+        Object val1 = serializer.deserializeValue(new Row(schema, row));
 
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
@@ -295,10 +297,11 @@ public class JavaSerializerTest {
 
         final ObjectFactory<?> objFactory = new ObjectFactory<>(PrivateTestObject.class);
         final Serializer serializer = factory.create(schema, key.getClass(), val.getClass());
-        byte[] bytes = serializer.serialize(key, objFactory.create());
 
-        Object key1 = serializer.deserializeKey(bytes);
-        Object val1 = serializer.deserializeValue(bytes);
+        BinaryRow row = serializer.serialize(key, objFactory.create());
+
+        Object key1 = serializer.deserializeKey(new Row(schema, row));
+        Object val1 = serializer.deserializeValue(new Row(schema, row));
 
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
@@ -333,15 +336,14 @@ public class JavaSerializerTest {
 
             Serializer serializer = factory.create(schema, key.getClass(), valClass);
 
-            byte[] bytes = serializer.serialize(key, objFactory.create());
+            BinaryRow row = serializer.serialize(key, objFactory.create());
 
-            Object key1 = serializer.deserializeKey(bytes);
-            Object val1 = serializer.deserializeValue(bytes);
+            Object key1 = serializer.deserializeKey(new Row(schema, row));
+            Object val1 = serializer.deserializeValue(new Row(schema, row));
 
             assertTrue(key.getClass().isInstance(key1));
             assertTrue(valClass.isInstance(val1));
-        }
-        finally {
+        } finally {
             Thread.currentThread().setContextClassLoader(loader);
         }
     }
@@ -367,10 +369,10 @@ public class JavaSerializerTest {
 
         Serializer serializer = factory.create(schema, key.getClass(), val.getClass());
 
-        byte[] bytes = serializer.serialize(key, val);
+        BinaryRow row = serializer.serialize(key, val);
 
-        Object key1 = serializer.deserializeKey(bytes);
-        Object val1 = serializer.deserializeValue(bytes);
+        Object key1 = serializer.deserializeKey(new Row(schema, row));
+        Object val1 = serializer.deserializeValue(new Row(schema, row));
 
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVMarshaller.java
similarity index 59%
rename from modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
rename to modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVMarshaller.java
index f13e20a..9fc8539 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVMarshaller.java
@@ -17,30 +17,40 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.row.Row;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Key-value serializer interface.
+ * Key-value marshaller interface provides method to marshal/unmarshal key and value objects to/from a row.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
  */
-public interface KVSerializer<K, V> {
+public interface KVMarshaller<K, V> {
     /**
-     * @param key Key object to serialize.
-     * @param val Value object to serialize.
-     * @return Table row with columns serialized from given key-value pair.
+     * Marshal key and value objects to a table row.
+     *
+     * @param key Key object to marshal.
+     * @param val Value object to marshal or {@code null}.
+     * @return Table row with columns from given key-value pair.
      */
-    Row serialize(@NotNull K key, V val);
+    BinaryRow marshal(@NotNull K key, V val);
 
     /**
+     * Unmarshal row to a key object.
+     *
      * @param row Table row.
-     * @return Deserialized key object.
+     * @return Key object.
      */
-    @NotNull K deserializeKey(@NotNull Row row);
+    @NotNull K unmarshalKey(@NotNull Row row);
 
     /**
+     * Unmarshal row to a value object.
+     *
      * @param row Table row.
-     * @return Deserialized value object.
+     * @return Value object.
      */
-    @Nullable V deserializeValue(@NotNull Row row);
+    @Nullable V unmarshalValue(@NotNull Row row);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index a0c4b5c..e9afd75 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -23,14 +23,16 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.schema.marshaller.KVSerializer;
+import org.apache.ignite.internal.schema.marshaller.KVMarshaller;
+import org.apache.ignite.internal.schema.marshaller.SerializationException;
+import org.apache.ignite.internal.schema.marshaller.Serializer;
+import org.apache.ignite.internal.schema.marshaller.SerializerFactory;
 import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.mapper.KeyMapper;
-import org.apache.ignite.table.mapper.ValueMapper;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -39,17 +41,34 @@ import org.jetbrains.annotations.Nullable;
  * Key-value view implementation.
  */
 public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValueView<K, V> {
+    /** Marshaller factory. */
+    private final SerializerFactory marshallerFactory;
+
+    /** Key object mapper. */
+    private final Mapper<K> keyMapper;
+
+    /** Value object mapper. */
+    private final Mapper<V> valueMapper;
+
+    /** Marshaller. */
+    private KVMarshallerImpl<K, V> marsh;
+
     /**
      * Constructor.
+     *
      * @param tbl Table storage.
      * @param schemaReg Schema registry.
      * @param keyMapper Key class mapper.
      * @param valueMapper Value class mapper.
      * @param tx The transaction.
      */
-    public KeyValueViewImpl(InternalTable tbl, SchemaRegistry schemaReg, KeyMapper<K> keyMapper,
-                            ValueMapper<V> valueMapper, @Nullable Transaction tx) {
+    public KeyValueViewImpl(InternalTable tbl, SchemaRegistry schemaReg, Mapper<K> keyMapper,
+                            Mapper<V> valueMapper, @Nullable Transaction tx) {
         super(tbl, schemaReg, tx);
+
+        this.keyMapper = keyMapper;
+        this.valueMapper = valueMapper;
+        marshallerFactory = SerializerFactory.createJavaSerializerFactory();
     }
 
     /** {@inheritDoc} */
@@ -59,20 +78,15 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<V> getAsync(@NotNull K key) {
-        Objects.requireNonNull(key);
-
-        final KVSerializer<K, V> marsh = marshaller();
-
-        Row kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), null);
 
         return tbl.get(kRow, tx)
-            .thenApply(this::wrap) // Binary -> schema-aware row
-            .thenApply(marsh::deserializeValue); // row -> deserialized obj.
+            .thenApply(this::unmarshalValue); // row -> deserialized obj.
     }
 
     /** {@inheritDoc} */
     @Override public Map<K, V> getAll(@NotNull Collection<K> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(getAllAsync(keys));
     }
 
     /** {@inheritDoc} */
@@ -82,27 +96,30 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
     /** {@inheritDoc} */
     @Override public boolean contains(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return get(key) != null;
     }
 
     /** {@inheritDoc} */
     @Override public CompletableFuture<Boolean> containsAsync(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return getAsync(key).thenApply(Objects::nonNull);
     }
 
     /** {@inheritDoc} */
     @Override public void put(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        sync(putAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), val);
+
+        return tbl.upsert(kRow, tx).thenAccept(ignore -> {
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void putAll(@NotNull Map<K, V> pairs) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        sync(putAllAsync(pairs));
     }
 
     /** {@inheritDoc} */
@@ -112,47 +129,55 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
     /** {@inheritDoc} */
     @Override public V getAndPut(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(getAndPutAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<V> getAndPutAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), val);
+
+        return tbl.getAndUpsert(kRow, tx).thenApply(this::unmarshalValue);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean putIfAbsent(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    @Override public boolean putIfAbsent(@NotNull K key, @NotNull V val) {
+        return sync(putIfAbsentAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), val);
+
+        return tbl.insert(kRow, tx);
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(removeAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), null);
+
+        return tbl.delete(kRow, tx);
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(@NotNull K key, @NotNull V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(removeAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key, @NotNull V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), val);
+
+        return tbl.deleteExact(kRow, tx);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<K> removeAll(@NotNull Collection<K> keys) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
@@ -162,46 +187,56 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
 
     /** {@inheritDoc} */
     @Override public V getAndRemove(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(@NotNull K key) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow kRow = marshal(Objects.requireNonNull(key), null);
+
+        return tbl.getAndDelete(kRow, tx).thenApply(this::unmarshalValue);
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(replaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow row = marshal(key, val);
+
+        return tbl.replace(row, tx);
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(@NotNull K key, V oldVal, V newVal) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(replaceAsync(key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V oldVal, V newVal) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow oldRow = marshal(key, oldVal);
+        BinaryRow newRow = marshal(key, newVal);
+
+        return tbl.replace(oldRow, newRow, tx);
     }
 
     /** {@inheritDoc} */
     @Override public V getAndReplace(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sync(getAndReplaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(@NotNull K key, V val) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        BinaryRow row = marshal(key, val);
+
+        return tbl.getAndReplace(row, tx).thenApply(this::unmarshalValue);
     }
 
     /** {@inheritDoc} */
-    @Override public <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
+    @Override
+    public <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
@@ -237,22 +272,93 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu
     }
 
     /**
+     * @param schemaVersion Schema version.
      * @return Marshaller.
      */
-    private KVSerializer<K, V> marshaller() {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    private KVMarshaller<K, V> marshaller(int schemaVersion) {
+        if (marsh == null || marsh.schemaVersion == schemaVersion) {
+            // TODO: Cache marshaller for schema version or upgrade row?
+            marsh = new KVMarshallerImpl<>(
+                schemaVersion,
+                marshallerFactory.create(
+                    schemaReg.schema(schemaVersion),
+                    keyMapper.getType(),
+                    valueMapper.getType()
+                )
+            );
+        }
+
+        return marsh;
+    }
+
+    private V unmarshalValue(BinaryRow v) {
+        if (v == null || !v.hasValue())
+            return null;
+
+        Row row = schemaReg.resolve(v);
+
+        KVMarshaller<K, V> marshaller = marshaller(row.schemaVersion());
+
+        return marshaller.unmarshalValue(row);
     }
 
-    /**
-     * @param row Binary row.
-     * @return Schema-aware row.
-     */
-    private Row wrap(BinaryRow row) {
-        if (row == null)
-            return null;
+    private BinaryRow marshal(@NotNull K key, V o) {
+        final KVMarshaller<K, V> marsh = marshaller(schemaReg.lastSchemaVersion());
 
-        final SchemaDescriptor rowSchema = schemaReg.schema(row.schemaVersion()); // Get a schema for row.
+        return marsh.marshal(key, o);
+    }
 
-        return new Row(rowSchema, row);
+    /**
+     * Marshaller wrapper for KV view.
+     * Note: Serializer must be re-created if schema changed.
+     *
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private static class KVMarshallerImpl<K, V> implements KVMarshaller<K, V> {
+        /** Schema version. */
+        private final int schemaVersion;
+
+        /** Serializer. */
+        private Serializer serializer;
+
+        /**
+         * Creates KV marshaller.
+         *
+         * @param schemaVersion Schema version.
+         * @param serializer Serializer.
+         */
+        KVMarshallerImpl(int schemaVersion, Serializer serializer) {
+            this.schemaVersion = schemaVersion;
+
+            this.serializer = serializer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public BinaryRow marshal(@NotNull K key, V val) {
+            try {
+                return serializer.serialize(key, val);
+            } catch (SerializationException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public K unmarshalKey(@NotNull Row row) {
+            try {
+                return serializer.deserializeKey(row);
+            } catch (SerializationException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public V unmarshalValue(@NotNull Row row) {
+            try {
+                return serializer.deserializeValue(row);
+            } catch (SerializationException e) {
+                throw new IgniteException(e);
+            }
+        }
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 8bdcc44..c6d3ed7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.schema.marshaller.RecordSerializer;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.mapper.RecordMapper;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -45,7 +45,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
      * @param mapper Record class mapper.
      * @param tx The transaction.
      */
-    public RecordViewImpl(InternalTable tbl, SchemaRegistry schemaReg, RecordMapper<R> mapper, @Nullable Transaction tx) {
+    public RecordViewImpl(InternalTable tbl, SchemaRegistry schemaReg, Mapper<R> mapper, @Nullable Transaction tx) {
         super(tbl, schemaReg, tx);
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 5247e34..2cd2f88 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -25,9 +25,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.mapper.KeyMapper;
-import org.apache.ignite.table.mapper.RecordMapper;
-import org.apache.ignite.table.mapper.ValueMapper;
+import org.apache.ignite.table.mapper.Mapper;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -85,7 +83,7 @@ public class TableImpl implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
+    @Override public <R> RecordView<R> recordView(Mapper<R> recMapper) {
         return new RecordViewImpl<>(tbl, schemaReg, recMapper, null);
     }
 
@@ -95,7 +93,7 @@ public class TableImpl implements Table {
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> KeyValueView<K, V> keyValueView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
+    @Override public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, Mapper<V> valMapper) {
         return new KeyValueViewImpl<>(tbl, schemaReg, keyMapper, valMapper, null);
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index ef4a133..5871469 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -41,19 +41,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * TODO: IGNITE-14487 Check key fields in Tuple is ignored for value or exception is thrown?
  */
 public class KeyValueBinaryViewOperationsTest {
+    /** Simple schema. */
+    private SchemaDescriptor schema = new SchemaDescriptor(
+        1,
+        new Column[]{new Column("id", NativeTypes.INT64, false)},
+        new Column[]{new Column("val", NativeTypes.INT64, false)}
+    );
+
+    /**
+     * Creates table view.
+     *
+     * @return Table KV binary view.
+     */
+    private KeyValueView<Tuple, Tuple> tableView() {
+        return new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+    }
+
     /**
      *
      */
     @Test
     public void put() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple val = Tuple.create().set("val", 11L);
@@ -89,14 +98,7 @@ public class KeyValueBinaryViewOperationsTest {
      */
     @Test
     public void putIfAbsent() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple val = Tuple.create().set("val", 11L);
@@ -122,14 +124,7 @@ public class KeyValueBinaryViewOperationsTest {
      */
     @Test
     public void getAndPut() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple val = Tuple.create().set("val", 11L);
@@ -155,15 +150,40 @@ public class KeyValueBinaryViewOperationsTest {
      *
      */
     @Test
-    public void remove() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
+    public void contains() {
+        KeyValueView<Tuple, Tuple> tbl = tableView();
+
+        final Tuple key = Tuple.create().set("id", 1L);
+        final Tuple val = Tuple.create().set("val", 11L);
+        final Tuple val2 = Tuple.create().set("val", 22L);
+
+        // Not-existed value.
+        assertFalse(tbl.contains(key));
+
+        // Put KV pair.
+        tbl.put(key, val);
+        assertTrue(tbl.contains(Tuple.create().set("id", 1L)));
+
+        // Delete key.
+        assertTrue(tbl.remove(key));
+        assertFalse(tbl.contains(Tuple.create().set("id", 1L)));
+
+        // Put KV pair.
+        tbl.put(key, val2);
+        assertTrue(tbl.contains(Tuple.create().set("id", 1L)));
+
+        // Non-existed key.
+        assertFalse(tbl.contains(Tuple.create().set("id", 2L)));
+        tbl.remove(Tuple.create().set("id", 2L));
+        assertFalse(tbl.contains(Tuple.create().set("id", 2L)));
+    }
 
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+    /**
+     *
+     */
+    @Test
+    public void remove() {
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple key2 = Tuple.create().set("id", 2L);
@@ -199,14 +219,7 @@ public class KeyValueBinaryViewOperationsTest {
      */
     @Test
     public void removeExact() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        final KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        final KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple key2 = Tuple.create().set("id", 2L);
@@ -254,14 +267,7 @@ public class KeyValueBinaryViewOperationsTest {
      */
     @Test
     public void replace() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple key2 = Tuple.create().set("id", 2L);
@@ -300,14 +306,7 @@ public class KeyValueBinaryViewOperationsTest {
      */
     @Test
     public void replaceExact() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-            1,
-            new Column[] {new Column("id", NativeTypes.INT64, false)},
-            new Column[] {new Column("val", NativeTypes.INT64, false)}
-        );
-
-        KeyValueView<Tuple, Tuple> tbl =
-            new KeyValueBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), null, null);
+        KeyValueView<Tuple, Tuple> tbl = tableView();
 
         final Tuple key = Tuple.create().set("id", 1L);
         final Tuple key2 = Tuple.create().set("id", 2L);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
new file mode 100644
index 0000000..18a9aa8
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueOperationsTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic table operations test.
+ * <p>
+ * TODO: IGNITE-14487 Add bulk operations tests.
+ * TODO: IGNITE-14487 Add async operations tests.
+ */
+public class KeyValueOperationsTest {
+    /** Default mapper. */
+    private final Mapper<Long> mapper = new Mapper<>() {
+        @Override public Class<Long> getType() {
+            return Long.class;
+        }
+    };
+
+    /** Simple schema. */
+    private SchemaDescriptor schema = new SchemaDescriptor(
+        1,
+        new Column[]{new Column("id", NativeTypes.INT64, false)},
+        new Column[]{new Column("val", NativeTypes.INT64, false)}
+    );
+
+    /**
+     * Creates table view.
+     *
+     * @return Table KV-view.
+     */
+    private KeyValueView<Long, Long> kvView() {
+        return new KeyValueViewImpl<>(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema), mapper, mapper, null);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void put() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        assertNull(tbl.get(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 11L);
+
+        assertEquals(11L, tbl.get(1L));
+        assertEquals(11L, tbl.get(1L));
+
+        // Update KV pair.
+        tbl.put(1L, 22L);
+
+        assertEquals(22L, tbl.get(1L));
+        assertEquals(22L, tbl.get(1L));
+
+        // Remove KV pair.
+        tbl.put(1L, null);
+
+        assertNull(tbl.get(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 33L);
+        assertEquals(33L, tbl.get(1L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void putIfAbsent() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        assertNull(tbl.get(1L));
+
+        // Insert new KV pair.
+        assertTrue(tbl.putIfAbsent(1L, 11L));
+
+        assertEquals(11L, tbl.get(1L));
+
+        // Update KV pair.
+        assertFalse(tbl.putIfAbsent(1L, 22L));
+
+        assertEquals(11L, tbl.get(1L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void getAndPut() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        assertNull(tbl.get(1L));
+
+        // Insert new tuple.
+        assertNull(tbl.getAndPut(1L, 11L));
+
+        assertEquals(11L, tbl.get(1L));
+
+        assertEquals(11L, tbl.getAndPut(1L, 22L));
+        assertEquals(22L, tbl.getAndPut(1L, 33L));
+
+        assertEquals(33L, tbl.get(1L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void contains() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        // Not-existed value.
+        assertFalse(tbl.contains(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 11L);
+        assertTrue(tbl.contains(1L));
+
+        // Delete key.
+        assertTrue(tbl.remove(1L));
+        assertFalse(tbl.contains(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 22L);
+        assertTrue(tbl.contains(1L));
+
+        // Delete key.
+        tbl.remove(2L);
+        assertFalse(tbl.contains(2L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void remove() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        // Put KV pair.
+        tbl.put(1L, 11L);
+
+        // Delete existed key.
+        assertEquals(11L, tbl.get(1L));
+        assertTrue(tbl.remove(1L));
+        assertNull(tbl.get(1L));
+
+        // Delete already deleted key.
+        assertFalse(tbl.remove(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 22L);
+        assertEquals(22L, tbl.get(1L));
+
+        // Delete existed key.
+        assertTrue(tbl.remove(1L));
+        assertNull(tbl.get(1L));
+
+        // Delete not existed key.
+        assertNull(tbl.get(2L));
+        assertFalse(tbl.remove(2L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void removeExact() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        // Put KV pair.
+        tbl.put(1L, 11L);
+        assertEquals(11L, tbl.get(1L));
+
+        // Fails to delete KV pair with unexpected value.
+        assertFalse(tbl.remove(1L, 22L));
+        assertEquals(11L, tbl.get(1L));
+
+        // Delete KV pair with expected value.
+        assertTrue(tbl.remove(1L, 11L));
+        assertNull(tbl.get(1L));
+
+        // Once again.
+        assertFalse(tbl.remove(1L, 11L));
+        assertNull(tbl.get(1L));
+
+        // Try to remove non-existed key.
+        assertFalse(tbl.remove(1L, 11L));
+        assertNull(tbl.get(1L));
+
+        // Put KV pair.
+        tbl.put(1L, 22L);
+        assertEquals(22L, tbl.get(1L));
+
+        // Check null value ignored.
+        assertThrows(Throwable.class, () -> tbl.remove(1L, null));
+        assertEquals(22L, tbl.get(1L));
+
+        // Delete KV pair with expected value.
+        assertTrue(tbl.remove(1L, 22L));
+        assertNull(tbl.get(1L));
+
+        assertFalse(tbl.remove(2L, 22L));
+        assertNull(tbl.get(2L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void replace() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(1L, 11L));
+        assertNull(tbl.get(1L));
+
+        tbl.put(1L, 11L);
+
+        // Replace existed KV pair.
+        assertTrue(tbl.replace(1L, 22L));
+        assertEquals(22L, tbl.get(1L));
+
+        // Remove existed KV pair.
+        assertTrue(tbl.replace(1L, null));
+        assertNull(tbl.get(1L));
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(1L, 33L));
+        assertNull(tbl.get(1L));
+
+        tbl.put(1L, 33L);
+        assertEquals(33L, tbl.get(1L));
+
+        // Remove non-existed KV pair.
+        assertFalse(tbl.replace(2L, null));
+        assertNull(tbl.get(2L));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void replaceExact() {
+        KeyValueView<Long, Long> tbl = kvView();
+
+        // Insert KV pair.
+        assertTrue(tbl.replace(1L, null, 11L));
+        assertEquals(11L, tbl.get(1L));
+        assertNull(tbl.get(2L));
+
+        // Ignore replace operation for non-existed KV pair.
+        assertFalse(tbl.replace(2L, 11L, 22L));
+        assertNull(tbl.get(2L));
+
+        // Replace existed KV pair.
+        assertTrue(tbl.replace(1L, 11L, 22L));
+        assertEquals(22L, tbl.get(1L));
+
+        // Remove existed KV pair.
+        assertTrue(tbl.replace(1L, 22L, null));
+        assertNull(tbl.get(1L));
+
+        // Insert KV pair.
+        assertTrue(tbl.replace(1L, null, 33L));
+        assertEquals(33L, tbl.get(1L));
+
+        // Remove non-existed KV pair.
+        assertTrue(tbl.replace(2L, null, null));
+    }
+}