You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/11/17 15:29:40 UTC

[ignite-3] 01/01: Add mapped types validation. Fix single column mapping. Support POJO fields in mapper. Add tests.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-15784
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 6bf4e2b152eb265ac56a985288e61ea676ad828f
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Tue Nov 16 14:26:09 2021 +0300

    Add mapped types validation.
    Fix single column mapping.
    Support POJO fields in mapper.
    Add tests.
    
    Signed-off-by: Andrew Mashenkov <an...@gmail.com>
---
 .../ignite/table/mapper/DefaultColumnMapper.java   |  12 +-
 .../apache/ignite/table/mapper/IdentityMapper.java |  28 +-
 .../org/apache/ignite/table/mapper/Mapper.java     |  92 ++++-
 .../apache/ignite/table/mapper/MapperBuilder.java  |  60 +--
 ...ltColumnMapper.java => SingleColumnMapper.java} |  42 +-
 .../internal/schema/marshaller/BinaryMode.java     |   5 +-
 .../schema/marshaller/MarshallerFactory.java       |   8 +-
 .../internal/schema/marshaller/MarshallerUtil.java |   9 +-
 .../marshaller/asm/AsmSerializerGenerator.java     |   2 +-
 .../marshaller/reflection/FieldAccessor.java       | 437 +++++++++++++--------
 .../schema/marshaller/reflection/Marshaller.java   |  90 +++--
 .../schema/marshaller/KvMarshallerTest.java        | 423 +++++++++++++-------
 .../internal/schema/marshaller/MapperTest.java     |  68 +++-
 .../schema/testobjects/TestOuterObject.java        |  33 ++
 .../org/apache/ignite/internal/table/Example.java  | 103 +++--
 15 files changed, 931 insertions(+), 481 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java
index e3efc13..f556b81 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * Simple mapper implementation that map a column to a class field.
+ * Mapper implementation which maps object fields to the columns by their names.
  *
  * @param <T> Target type.
  */
@@ -47,9 +47,15 @@ class DefaultColumnMapper<T> implements Mapper<T> {
     @Override public Class<T> targetType() {
         return targetType;
     }
-    
+
+    /** {@inheritDoc} */
+    @Override
+    public String mappedColumn() {
+        return null;
+    }
+
     /** {@inheritDoc} */
-    @Override public String columnToField(@NotNull String columnName) {
+    @Override public String mappedField(@NotNull String columnName) {
         return mapping.get(columnName);
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/IdentityMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/IdentityMapper.java
index 7e8d9e2..0f5c458 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/IdentityMapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/IdentityMapper.java
@@ -23,17 +23,17 @@ import java.util.Set;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * Trivial mapper implementation that maps a column to a field with the same name.
+ * Simple mapper implementation which maps POJO fields to the columns with the same name.
  *
  * @param <T> Target type.
  */
 class IdentityMapper<T> implements Mapper<T> {
     /** Target type. */
     private final Class<T> targetType;
-    
+
     /** Class field names. */
     private final Set<String> fieldsNames;
-    
+
     /**
      * Creates a mapper for given class.
      *
@@ -41,23 +41,33 @@ class IdentityMapper<T> implements Mapper<T> {
      */
     IdentityMapper(Class<T> targetType) {
         this.targetType = targetType;
-        
+
+        //TODO: process inherited fields.
         Field[] fields = targetType.getDeclaredFields();
         fieldsNames = new HashSet<>(fields.length);
-        
+
         for (int i = 0; i < fields.length; i++) {
+            //TODO Filter out 'static' fields.
             //TODO IGNITE-15787 Filter out 'transient' fields.
             fieldsNames.add(fields[i].getName());
         }
     }
-    
+
     /** {@inheritDoc} */
-    @Override public Class<T> targetType() {
+    @Override
+    public Class<T> targetType() {
         return targetType;
     }
-    
+
+    /** {@inheritDoc} */
+    @Override
+    public String mappedColumn() {
+        return null;
+    }
+
     /** {@inheritDoc} */
-    @Override public String columnToField(@NotNull String columnName) {
+    @Override
+    public String mappedField(@NotNull String columnName) {
         return fieldsNames.contains(columnName) ? columnName : null;
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
index 5bdb4ed..ba5a027 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.table.mapper;
 
+import java.lang.reflect.Modifier;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -27,50 +28,101 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface Mapper<T> {
     /**
-     * Creates a mapper for a class.
+     * Creates a mapper for a case when object individual fields map to the column with the same names.
      *
-     * @param cls Key class.
-     * @param <K> Key type.
+     * @param cls Target object class.
      * @return Mapper.
      */
-    static <K> Mapper<K> of(Class<K> cls) {
+    static <O> Mapper<O> of(Class<O> cls) {
         return identity(cls);
     }
-    
+
+    /**
+     * Creates a mapper for a simple case when a whole object maps to a single column.
+     *
+     * @param columnName Column name.
+     * @param cls        Target object class.
+     * @return Single column mapper.
+     */
+    static <O> Mapper<O> of(String columnName, Class<O> cls) {
+        return new SingleColumnMapper<>(ensureValidKind(cls), columnName);
+    }
+
     /**
-     * Creates a mapper builder for a class.
+     * Creates a mapper builder for objects of given class.
      *
-     * @param cls Value class.
-     * @param <V> Value type.
+     * @param cls Target object class.
      * @return Mapper builder.
+     * @throws IllegalArgumentException If class is of unsupported kind. E.g. inner, anonymous or local.
      */
-    static <V> MapperBuilder<V> builderFor(Class<V> cls) {
+    static <O> MapperBuilder<O> builderFor(Class<O> cls) {
+        ensureDefaultConsturctor(ensureValidKind(cls));
+
         return new MapperBuilder<>(cls);
     }
-    
+
     /**
      * Creates identity mapper which is used for simple types that have native support or objects with field names that match column names.
      *
-     * @param targetClass Target type class.
+     * @param cls Target type class.
      * @param <T>         Target type.
      * @return Mapper.
      */
-    static <T> Mapper<T> identity(Class<T> targetClass) {
-        return new IdentityMapper<T>(targetClass);
+    static <T> Mapper<T> identity(Class<T> cls) {
+        return new IdentityMapper<T>(ensureValidKind(cls));
+    }
+
+    /**
+     * Ensures class is of the supported kind.
+     *
+     * @param cls Class to validate.
+     * @return {@code cls} if it is valid.
+     * @throws IllegalArgumentException If {@code cls} is invalid and can't be used in mapping.
+     */
+    private static <T> Class<T> ensureValidKind(Class<T> cls) {
+        if (cls.isAnonymousClass() || cls.isLocalClass() || cls.isSynthetic() || cls.isPrimitive() ||
+                (cls.isMemberClass() && !Modifier.isStatic(cls.getModifiers()))) {
+            throw new IllegalArgumentException("Class is of unsupported kind.");
+        }
+
+        return cls;
     }
-    
+
     /**
-     * Return mapped type.
+     * Ensures class has default constructor.
      *
-     * @return Mapped type.
+     * @param cls Class to validate.
+     * @throws IllegalArgumentException If {@code cls} can't be used in mapping.
+     */
+    static <O> void ensureDefaultConsturctor(Class<O> cls) {
+        try {
+            cls.getDeclaredConstructor(new Class[0]);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalArgumentException("Class must have default constructor.");
+        }
+    }
+
+    /**
+     * Returns a type which objects (or their fields) are mapped with the columns.
+     *
+     * @return Mapper target type.
      */
     Class<T> targetType();
-    
+
+    /**
+     * Returns a column name if the whole object is mapped to the single column, otherwise, returns {@code null} and individual column
+     * mapping (see {@link #mappedField(String)) should be used}.
+     *
+     * @return Column name that a whole object is mapped to, or {@code null}.
+     */
+    String mappedColumn();
+
     /**
-     * Maps a column name to a field name.
+     * Return a field name for given column name when POJO individual fields are mapped to columns, otherwise fails.
      *
      * @param columnName Column name.
      * @return Field name or {@code null} if no field mapped to a column.
+     * @throws IllegalStateException If a whole object is mapped to a single column.
      */
-    @Nullable String columnToField(@NotNull String columnName);
-}
+    @Nullable String mappedField(@NotNull String columnName);
+}
\ No newline at end of file
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
index e1357e2..a562190 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.table.mapper;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,10 +34,14 @@ import org.jetbrains.annotations.NotNull;
 public final class MapperBuilder<T> {
     /** Target type. */
     private Class<T> targetType;
-    
+
     /** Column-to-field name mapping. */
     private Map<String, String> columnToFields;
-    
+
+    private String rawPojoColumnName;
+
+    private boolean isStale;
+
     /**
      * Creates a mapper builder for a type.
      *
@@ -44,10 +49,10 @@ public final class MapperBuilder<T> {
      */
     MapperBuilder(@NotNull Class<T> targetType) {
         this.targetType = targetType;
-        
+
         columnToFields = new HashMap<>(targetType.getDeclaredFields().length);
     }
-    
+
     /**
      * Add mapping for a field to a column.
      *
@@ -55,20 +60,18 @@ public final class MapperBuilder<T> {
      * @param columnName Column name.
      * @return {@code this} for chaining.
      * @throws IllegalArgumentException if a column was already mapped to some field.
-     * @throws IllegalStateException if tries to reuse the builder after a mapping has been built.
+     * @throws IllegalStateException    if tries to reuse the builder after a mapping has been built.
      */
     public MapperBuilder<T> map(@NotNull String fieldName, @NotNull String columnName) {
-        if (columnToFields == null) {
+        if (isStale) {
             throw new IllegalStateException("Mapper builder can't be reused.");
-        }
-        
-        if (columnToFields.put(Objects.requireNonNull(columnName), Objects.requireNonNull(fieldName)) != null) {
+        } else if (columnToFields.put(Objects.requireNonNull(columnName), Objects.requireNonNull(fieldName)) != null) {
             throw new IllegalArgumentException("Mapping for a column already exists: " + columnName);
         }
-        
+
         return this;
     }
-    
+
     /**
      * Map a field to a type of given class.
      *
@@ -79,7 +82,7 @@ public final class MapperBuilder<T> {
     public MapperBuilder<T> map(@NotNull String fieldName, Class<?> targetClass) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
-    
+
     /**
      * Adds a functional mapping for a field, the result depends on function call for every particular row.
      *
@@ -90,17 +93,26 @@ public final class MapperBuilder<T> {
     public MapperBuilder<T> map(@NotNull String fieldName, Function<Tuple, Object> mappingFunction) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
-    
+
     /**
-     * Sets a target class to deserialize to.
+     * Map an object to a binary column.
      *
-     * @param targetClass Target class.
+     * @param columnName Column name.
      * @return {@code this} for chaining.
      */
-    public MapperBuilder<T> deserializeTo(@NotNull Class<?> targetClass) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+    public MapperBuilder<T> toBinaryColumn(String columnName) {
+        if (isStale) {
+            throw new IllegalStateException("Mapper builder can't be reused.");
+        } else if (!columnToFields.isEmpty()) {
+            throw new IllegalStateException(
+                    "Can't map object to a binary column because the mapping for it`s individual fields is already defined.");
+        }
+
+        columnToFields = null;
+
+        return this;
     }
-    
+
     /**
      * Builds mapper.
      *
@@ -108,22 +120,24 @@ public final class MapperBuilder<T> {
      * @throws IllegalStateException if nothing were mapped or more than one column were mapped to the same field.
      */
     public Mapper<T> build() {
+        isStale = true;
+
         if (columnToFields.isEmpty()) {
             throw new IllegalStateException("Empty mapping isn't allowed.");
         }
-        
+
         Map<String, String> mapping = this.columnToFields;
-        
+
         this.columnToFields = null;
-        
+
         HashSet<String> fields = new HashSet<>(mapping.size());
-        
+
         for (String f : mapping.values()) {
             if (!fields.add(f)) {
                 throw new IllegalStateException("More than one column is mapped to the field: field=" + f);
             }
         }
-        
+
         return new DefaultColumnMapper<>(targetType, mapping);
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java b/modules/api/src/main/java/org/apache/ignite/table/mapper/SingleColumnMapper.java
similarity index 61%
copy from modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java
copy to modules/api/src/main/java/org/apache/ignite/table/mapper/SingleColumnMapper.java
index e3efc13..9ba1ab4 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/DefaultColumnMapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/SingleColumnMapper.java
@@ -17,39 +17,41 @@
 
 package org.apache.ignite.table.mapper;
 
-import java.util.Map;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Simple mapper implementation that map a column to a class field.
+ * Simple mapper implementation that maps a whole object (of the target type) to a single column.
  *
  * @param <T> Target type.
  */
-class DefaultColumnMapper<T> implements Mapper<T> {
+class SingleColumnMapper<T> implements Mapper<T> {
     /** Target type. */
     private final Class<T> targetType;
-    
-    /** Column-to-field name mapping. */
-    private final Map<String, String> mapping;
-    
-    /**
-     * Creates a mapper for given type.
-     *
-     * @param targetType Target type.
-     * @param mapping Column-to-field name mapping.
-     */
-    DefaultColumnMapper(Class<T> targetType, Map<String, String> mapping) {
+
+    /** Column name. */
+    private final String mappedColumn;
+
+    SingleColumnMapper(Class<T> targetType, @NotNull String mappedColumn) {
         this.targetType = targetType;
-        this.mapping = mapping;
+        this.mappedColumn = mappedColumn;
     }
-    
+
     /** {@inheritDoc} */
-    @Override public Class<T> targetType() {
+    @Override
+    public Class<T> targetType() {
         return targetType;
     }
-    
+
+    /** {@inheritDoc} */
+    @Override
+    public String mappedColumn() {
+        return mappedColumn;
+    }
+
     /** {@inheritDoc} */
-    @Override public String columnToField(@NotNull String columnName) {
-        return mapping.get(columnName);
+    @Override
+    public @Nullable String mappedField(@NotNull String columnName) {
+        throw new UnsupportedOperationException("Not intended for individual fields mapping.");
     }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/BinaryMode.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/BinaryMode.java
index 6f73d2c..27d40c9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/BinaryMode.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/BinaryMode.java
@@ -87,7 +87,10 @@ public enum BinaryMode {
     DATETIME(NativeTypeSpec.DATETIME),
 
     /** Timestamp. */
-    TIMESTAMP(NativeTypeSpec.TIMESTAMP);
+    TIMESTAMP(NativeTypeSpec.TIMESTAMP),
+
+    /** User object. */
+    POJO(NativeTypeSpec.BYTES);
 
     /** Native type spec. */
     private final NativeTypeSpec typeSpec;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerFactory.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerFactory.java
index 3c8858e..208074c 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerFactory.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerFactory.java
@@ -42,11 +42,11 @@ public interface MarshallerFactory {
      * @param keyClass Key type.
      * @param valueClass Value type.
      * @return Key-value marshaller.
-     * @see Mapper#identity(Class)
+     * @see Mapper#of(Class)
      */
     default <K, V> KvMarshaller<K, V> create(SchemaDescriptor schema, Class<K> keyClass,
             Class<V> valueClass) {
-        return create(schema, Mapper.identity(keyClass), Mapper.identity(valueClass));
+        return create(schema, Mapper.of(keyClass), Mapper.of(valueClass));
     }
     
     /**
@@ -64,9 +64,9 @@ public interface MarshallerFactory {
      * @param schema      Schema descriptor.
      * @param recordClass Record type.
      * @return Record marshaller.
-     * @see Mapper#identity(Class)
+     * @see Mapper#of(Class)
      */
     default <R> RecordMarshaller<R> create(SchemaDescriptor schema, Class<R> recordClass) {
-        return create(schema, Mapper.identity(recordClass));
+        return create(schema, Mapper.of(recordClass));
     }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
index 08c0e52..4295fdf 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
@@ -45,8 +45,9 @@ public final class MarshallerUtil {
     public static int getValueSize(Object val, NativeType type) throws InvalidTypeException {
         switch (type.spec()) {
             case BYTES:
-                return ((byte[]) val).length;
-            
+                // Return zero for pojos as they are not serialized yet.
+               return (val instanceof byte[]) ? ((byte[]) val).length : 0;
+
             case STRING:
                 // Overestimating size here prevents from later unwanted row buffer expanding.
                 return ((CharSequence) val).length() << 1;
@@ -118,7 +119,7 @@ public final class MarshallerUtil {
             return BinaryMode.DECIMAL;
         }
         
-        return null;
+        return BinaryMode.POJO;
     }
     
     /**
@@ -129,7 +130,7 @@ public final class MarshallerUtil {
      * @return Object factory.
      */
     public static <T> ObjectFactory<T> factoryForClass(Class<T> targetCls) {
-        if (mode(targetCls) == null) {
+        if (mode(targetCls) == BinaryMode.POJO) {
             return new ObjectFactory<>(targetCls);
         } else {
             return null;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
index a51e02f..9c381fc 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
@@ -184,7 +184,7 @@ public class AsmSerializerGenerator implements SerializerFactory {
     ) {
         final BinaryMode mode = MarshallerUtil.mode(cls);
     
-        if (mode == null) {
+        if (mode == BinaryMode.POJO) {
             return new ObjectMarshallerCodeGenerator(columns, cls, firstColIdx);
         } else {
             return new IdentityMarshallerCodeGenerator(cls, ColumnAccessCodeGenerator.createAccessor(mode, firstColIdx));
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
index 54387ba..b9742e3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/FieldAccessor.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.lang.reflect.Field;
@@ -43,21 +48,21 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
 abstract class FieldAccessor {
     /** VarHandle. */
     protected final VarHandle varHandle;
-    
+
     /** Mode. */
     protected final BinaryMode mode;
-    
+
     /**
      * Mapped column position in the schema.
      *
      * <p>NODE: Do not mix up with column index in {@link Columns} container.
      */
     protected final int colIdx;
-    
+
     static FieldAccessor noopAccessor(Column col) {
         return new UnmappedFieldAccessor(col);
     }
-    
+
     /**
      * Create accessor for the field.
      *
@@ -70,37 +75,37 @@ abstract class FieldAccessor {
     static FieldAccessor create(Class<?> type, String fldName, Column col, int colIdx) {
         try {
             final Field field = type.getDeclaredField(fldName);
-            
+
             if (field.getType().isPrimitive() && col.nullable()) {
                 throw new IllegalArgumentException("Failed to map non-nullable field to nullable column [name=" + field.getName() + ']');
             }
-            
+
             BinaryMode mode = MarshallerUtil.mode(field.getType());
             final MethodHandles.Lookup lookup = MethodHandles.privateLookupIn(type, MethodHandles.lookup());
-            
+
             VarHandle varHandle = lookup.unreflectVarHandle(field);
-            
+
             assert mode != null : "Invalid mode for type: " + field.getType();
-            
+
             switch (mode) {
                 case P_BYTE:
                     return new BytePrimitiveAccessor(varHandle, colIdx);
-                
+
                 case P_SHORT:
                     return new ShortPrimitiveAccessor(varHandle, colIdx);
-                
+
                 case P_INT:
                     return new IntPrimitiveAccessor(varHandle, colIdx);
-                
+
                 case P_LONG:
                     return new LongPrimitiveAccessor(varHandle, colIdx);
-                
+
                 case P_FLOAT:
                     return new FloatPrimitiveAccessor(varHandle, colIdx);
-                
+
                 case P_DOUBLE:
                     return new DoublePrimitiveAccessor(varHandle, colIdx);
-                
+
                 case BYTE:
                 case SHORT:
                 case INT:
@@ -118,17 +123,19 @@ abstract class FieldAccessor {
                 case DATETIME:
                 case TIMESTAMP:
                     return new ReferenceFieldAccessor(varHandle, colIdx, mode);
-                
+
+                case POJO:
+                    return new PojoFieldAccessor(varHandle, colIdx);
+
                 default:
-                    assert false : "Invalid mode " + mode;
+                    throw new IllegalArgumentException("Failed to create accessor for field [name=" + field.getName() + ']');
             }
-            
-            throw new IllegalArgumentException("Failed to create accessor for field [name=" + field.getName() + ']');
+
         } catch (NoSuchFieldException | SecurityException | IllegalAccessException ex) {
             throw new IllegalArgumentException(ex);
         }
     }
-    
+
     /**
      * Create accessor for the field.
      *
@@ -147,7 +154,7 @@ abstract class FieldAccessor {
             case P_FLOAT:
             case P_DOUBLE:
                 throw new IllegalArgumentException("Primitive key/value types are not possible by API contract.");
-            
+
             case BYTE:
             case SHORT:
             case INT:
@@ -165,14 +172,15 @@ abstract class FieldAccessor {
             case DATETIME:
             case TIMESTAMP:
                 return new IdentityAccessor(colIdx, mode);
-            
+
+            case POJO:
+                return new PojoIdentityAccessor(colIdx);
+
             default:
-                assert false : "Invalid mode " + mode;
+                throw new IllegalArgumentException("Failed to create accessor for column [name=" + col.name() + ']');
         }
-        
-        throw new IllegalArgumentException("Failed to create accessor for column [name=" + col.name() + ']');
     }
-    
+
     /**
      * Reads value object from row.
      *
@@ -184,97 +192,97 @@ abstract class FieldAccessor {
     private static Object readRefValue(Row reader, int colIdx, BinaryMode mode) {
         assert reader != null;
         assert colIdx >= 0;
-        
+
         Object val = null;
-        
+
         switch (mode) {
             case BYTE:
                 val = reader.byteValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case SHORT:
                 val = reader.shortValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case INT:
                 val = reader.intValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case LONG:
                 val = reader.longValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case FLOAT:
                 val = reader.floatValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case DOUBLE:
                 val = reader.doubleValueBoxed(colIdx);
-                
+
                 break;
-            
+
             case STRING:
                 val = reader.stringValue(colIdx);
-                
+
                 break;
-            
+
             case UUID:
                 val = reader.uuidValue(colIdx);
-                
+
                 break;
-            
+
             case BYTE_ARR:
                 val = reader.bytesValue(colIdx);
-                
+
                 break;
-            
+
             case BITSET:
                 val = reader.bitmaskValue(colIdx);
-                
+
                 break;
-            
+
             case NUMBER:
                 val = reader.numberValue(colIdx);
-                
+
                 break;
-            
+
             case DECIMAL:
                 val = reader.decimalValue(colIdx);
-                
+
                 break;
-            
+
             case DATE:
                 val = reader.dateValue(colIdx);
-                
+
                 break;
-            
+
             case TIME:
                 val = reader.timeValue(colIdx);
-                
+
                 break;
-            
+
             case TIMESTAMP:
                 val = reader.timestampValue(colIdx);
-                
+
                 break;
-            
+
             case DATETIME:
                 val = reader.dateTimeValue(colIdx);
-                
+
                 break;
-            
+
             default:
-                assert false : "Invalid mode: " + mode;
+                throw new IllegalStateException("Invalid column write node: " + mode);
         }
-        
+
         return val;
     }
-    
+
     /**
      * Writes reference value to row.
      *
@@ -284,99 +292,99 @@ abstract class FieldAccessor {
      */
     private static void writeRefObject(Object val, RowAssembler writer, BinaryMode mode) {
         assert writer != null;
-        
+
         if (val == null) {
             writer.appendNull();
-            
+
             return;
         }
-        
+
         switch (mode) {
             case BYTE:
                 writer.appendByte((Byte) val);
-                
+
                 break;
-            
+
             case SHORT:
                 writer.appendShort((Short) val);
-                
+
                 break;
-            
+
             case INT:
                 writer.appendInt((Integer) val);
-                
+
                 break;
-            
+
             case LONG:
                 writer.appendLong((Long) val);
-                
+
                 break;
-            
+
             case FLOAT:
                 writer.appendFloat((Float) val);
-                
+
                 break;
-            
+
             case DOUBLE:
                 writer.appendDouble((Double) val);
-                
+
                 break;
-            
+
             case STRING:
                 writer.appendString((String) val);
-                
+
                 break;
-            
+
             case UUID:
                 writer.appendUuid((UUID) val);
-                
+
                 break;
-            
+
             case BYTE_ARR:
                 writer.appendBytes((byte[]) val);
-                
+
                 break;
-            
+
             case BITSET:
                 writer.appendBitmask((BitSet) val);
-                
+
                 break;
-            
+
             case NUMBER:
                 writer.appendNumber((BigInteger) val);
-                
+
                 break;
-            
+
             case DECIMAL:
                 writer.appendDecimal((BigDecimal) val);
-                
+
                 break;
-            
+
             case DATE:
                 writer.appendDate((LocalDate) val);
-                
+
                 break;
-            
+
             case TIME:
                 writer.appendTime((LocalTime) val);
-                
+
                 break;
-            
+
             case TIMESTAMP:
                 writer.appendTimestamp((Instant) val);
-                
+
                 break;
-            
+
             case DATETIME:
                 writer.appendDateTime((LocalDateTime) val);
-                
+
                 break;
-            
+
             default:
-                assert false : "Invalid mode: " + mode;
+                throw new IllegalStateException("Invalid column write node: " + mode);
         }
     }
-    
+
     /**
      * Constructor.
      *
@@ -386,12 +394,12 @@ abstract class FieldAccessor {
      */
     protected FieldAccessor(VarHandle varHandle, int colIdx, BinaryMode mode) {
         assert colIdx >= 0;
-        
+
         this.colIdx = colIdx;
         this.mode = Objects.requireNonNull(mode);
         this.varHandle = Objects.requireNonNull(varHandle);
     }
-    
+
     /**
      * Constructor.
      *
@@ -400,12 +408,12 @@ abstract class FieldAccessor {
      */
     private FieldAccessor(int colIdx, BinaryMode mode) {
         assert colIdx >= 0;
-        
+
         this.colIdx = colIdx;
         this.mode = mode;
         varHandle = null;
     }
-    
+
     /**
      * Write object field value to row.
      *
@@ -420,7 +428,7 @@ abstract class FieldAccessor {
             throw new MarshallerException("Failed to write field [id=" + colIdx + ']', ex);
         }
     }
-    
+
     /**
      * Write object field value to row.
      *
@@ -429,7 +437,7 @@ abstract class FieldAccessor {
      * @throws Exception If write failed.
      */
     protected abstract void write0(RowAssembler writer, Object obj) throws Exception;
-    
+
     /**
      * Reads value fom row to object field.
      *
@@ -444,17 +452,17 @@ abstract class FieldAccessor {
             throw new MarshallerException("Failed to read field [id=" + colIdx + ']', ex);
         }
     }
-    
+
     /**
      * Read an object from a row.
      *
      * @param reader Row reader.
      * @return Object.
      */
-    public Object read(Row reader) {
+    public Object read(Row reader) throws MarshallerException {
         throw new UnsupportedOperationException();
     }
-    
+
     /**
      * Reads value fom row to object field.
      *
@@ -463,7 +471,7 @@ abstract class FieldAccessor {
      * @throws Exception If failed.
      */
     protected abstract void read0(Row reader, Object obj) throws Exception;
-    
+
     /**
      * Reads object field value.
      *
@@ -473,14 +481,14 @@ abstract class FieldAccessor {
     Object value(Object obj) {
         return varHandle.get(Objects.requireNonNull(obj));
     }
-    
+
     /**
      * Stubbed accessor for unused columns writes default column value, and ignore value on read access.
      */
     private static class UnmappedFieldAccessor extends FieldAccessor {
         /** Column. */
         private final Column col;
-        
+
         /**
          * Constructor.
          *
@@ -490,26 +498,26 @@ abstract class FieldAccessor {
             super(0, null);
             this.col = col;
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             // No-op.
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             RowAssembler.writeValue(writer, col, col.defaultValue());
         }
-        
+
         /** {@inheritDoc} */
         @Override
         Object value(Object obj) {
             return col.defaultValue();
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code byte} type.
      */
@@ -523,32 +531,90 @@ abstract class FieldAccessor {
         IdentityAccessor(int colIdx, BinaryMode mode) {
             super(colIdx, mode);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             writeRefObject(obj, writer, mode);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             throw new UnsupportedOperationException("Called identity accessor for object field.");
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public Object read(Row reader) {
             return readRefValue(reader, colIdx, mode);
         }
-        
+
+        /** {@inheritDoc} */
+        @Override
+        Object value(Object obj) {
+            return obj;
+        }
+    }
+
+    /**
+     * Accessor for a field of primitive {@code byte} type.
+     */
+    private static class PojoIdentityAccessor extends FieldAccessor {
+        /**
+         * Constructor.
+         *
+         * @param colIdx Column index.
+         */
+        PojoIdentityAccessor(int colIdx) {
+            super(colIdx, BinaryMode.BYTE_ARR);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void write0(RowAssembler writer, Object obj) throws Exception {
+            assert obj != null;
+
+            ByteArrayOutputStream out = new ByteArrayOutputStream(512);
+
+            try (ObjectOutputStream oos = new ObjectOutputStream(out)) {
+                oos.writeObject(obj);
+            }
+
+            obj = out.toByteArray();
+
+            writeRefObject(obj, writer, BinaryMode.BYTE_ARR);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void read0(Row reader, Object obj) {
+            throw new UnsupportedOperationException("Called identity accessor for object field.");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Object read(Row reader) throws MarshallerException {
+            Object val = readRefValue(reader, colIdx, BinaryMode.BYTE_ARR);
+
+            if (val != null) {
+                try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream((byte[]) val))) {
+                    val = is.readObject();
+                } catch (IOException | ClassNotFoundException e) {
+                    throw new MarshallerException(e);
+                }
+            }
+
+            return val;
+        }
+
         /** {@inheritDoc} */
         @Override
         Object value(Object obj) {
             return obj;
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code byte} type.
      */
@@ -562,24 +628,24 @@ abstract class FieldAccessor {
         BytePrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_BYTE);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final byte val = (byte) varHandle.get(obj);
-            
+
             writer.appendByte(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final byte val = reader.byteValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code short} type.
      */
@@ -593,24 +659,24 @@ abstract class FieldAccessor {
         ShortPrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_SHORT);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final short val = (short) varHandle.get(obj);
-            
+
             writer.appendShort(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final short val = reader.shortValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code int} type.
      */
@@ -624,24 +690,24 @@ abstract class FieldAccessor {
         IntPrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_INT);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final int val = (int) varHandle.get(obj);
-            
+
             writer.appendInt(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final int val = reader.intValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code long} type.
      */
@@ -655,24 +721,24 @@ abstract class FieldAccessor {
         LongPrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_LONG);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final long val = (long) varHandle.get(obj);
-            
+
             writer.appendLong(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final long val = reader.longValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code float} type.
      */
@@ -686,24 +752,24 @@ abstract class FieldAccessor {
         FloatPrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_FLOAT);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final float val = (float) varHandle.get(obj);
-            
+
             writer.appendFloat(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final float val = reader.floatValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of primitive {@code double} type.
      */
@@ -717,24 +783,24 @@ abstract class FieldAccessor {
         DoublePrimitiveAccessor(VarHandle varHandle, int colIdx) {
             super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.P_DOUBLE);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             final double val = (double) varHandle.get(obj);
-            
+
             writer.appendDouble(val);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void read0(Row reader, Object obj) {
             final double val = reader.doubleValue(colIdx);
-            
+
             varHandle.set(obj, val);
         }
     }
-    
+
     /**
      * Accessor for a field of reference type.
      */
@@ -749,29 +815,76 @@ abstract class FieldAccessor {
         ReferenceFieldAccessor(VarHandle varHandle, int colIdx, BinaryMode mode) {
             super(Objects.requireNonNull(varHandle), colIdx, mode);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         protected void write0(RowAssembler writer, Object obj) {
             assert obj != null;
             assert writer != null;
-            
+
             Object val = varHandle.get(obj);
-            
-            if (val == null) {
-                writer.appendNull();
-                
-                return;
-            }
-            
+
             writeRefObject(val, writer, mode);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public void read0(Row reader, Object obj) {
             Object val = readRefValue(reader, colIdx, mode);
-            
+
+            varHandle.set(obj, val);
+        }
+    }
+
+    /**
+     * Accessor for a field of reference type.
+     */
+    private static class PojoFieldAccessor extends FieldAccessor {
+        /**
+         * Constructor.
+         *
+         * @param varHandle VarHandle.
+         * @param colIdx    Column index.
+         */
+        PojoFieldAccessor(VarHandle varHandle, int colIdx) {
+            super(Objects.requireNonNull(varHandle), colIdx, BinaryMode.BYTE_ARR);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void write0(RowAssembler writer, Object obj) throws MarshallerException {
+            assert obj != null;
+
+            Object val = varHandle.get(obj);
+
+            if (val != null) {
+                ByteArrayOutputStream out = new ByteArrayOutputStream(512);
+
+                try (ObjectOutputStream oos = new ObjectOutputStream(out)) {
+                    oos.writeObject(val);
+                } catch (IOException e) {
+                    throw new MarshallerException(e);
+                }
+
+                val = out.toByteArray();
+            }
+
+            writeRefObject(val, writer, BinaryMode.BYTE_ARR);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void read0(Row reader, Object obj) throws MarshallerException {
+            Object val = readRefValue(reader, colIdx, BinaryMode.BYTE_ARR);
+
+            if (val != null) {
+                try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream((byte[]) val))) {
+                    val = ois.readObject();
+                } catch (IOException | ClassNotFoundException e) {
+                    throw new MarshallerException(e);
+                }
+            }
+
             varHandle.set(obj, val);
         }
     }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
index cbe2137..e469f7f 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/Marshaller.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
+import java.util.Arrays;
 import java.util.Objects;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.SchemaMismatchException;
 import org.apache.ignite.internal.schema.marshaller.BinaryMode;
 import org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import org.apache.ignite.internal.schema.marshaller.MarshallerUtil;
@@ -43,33 +45,39 @@ public abstract class Marshaller {
      */
     public static <T> Marshaller createMarshaller(Column[] cols, Mapper<T> mapper) {
         final BinaryMode mode = MarshallerUtil.mode(mapper.targetType());
-        
-        if (mode != null) {
-            final Column col = cols[0];
-            
-            assert cols.length == 1;
-            assert mode.typeSpec() == col.type().spec() : "Target type is not compatible.";
+
+        if (mode != BinaryMode.POJO || mapper.mappedColumn() != null) {
+            Column col = (mapper.mappedColumn() == null && cols.length == 1) ? cols[0] :
+                    Arrays.stream(cols).filter(c -> c.name().equals(mapper.mappedColumn())).findFirst().orElseThrow(() ->
+                    new SchemaMismatchException("Failed to map object to a single column: mappedColumn=" + mapper.mappedColumn()));
+
+            if (mode.typeSpec() != col.type().spec()) {
+                throw new SchemaMismatchException(
+                        String.format("Object can't be mapped to a column of incompatible type: columnType=%s, mappedType=%s",
+                                col.type().spec(), mapper.targetType().getName()));
+            }
+
             assert !mapper.targetType().isPrimitive() : "Non-nullable types are not allowed.";
-            
+
             return new SimpleMarshaller(FieldAccessor.createIdentityAccessor(col, col.schemaIndex(), mode));
         }
-        
+
         FieldAccessor[] fieldAccessors = new FieldAccessor[cols.length];
-        
+
         // Build handlers.
         for (int i = 0; i < cols.length; i++) {
             final Column col = cols[i];
-            
-            String fieldName = mapper.columnToField(col.name());
-            
+
+            String fieldName = mapper.mappedField(col.name());
+
             // TODO: IGNITE-15785 validate key marshaller has no NoopAccessors.
             fieldAccessors[i] = (fieldName == null) ? FieldAccessor.noopAccessor(col) :
                     FieldAccessor.create(mapper.targetType(), fieldName, col, col.schemaIndex());
         }
-        
+
         return new PojoMarshaller(new ObjectFactory<>(mapper.targetType()), fieldAccessors);
     }
-    
+
     /**
      * Creates a marshaller for class.
      *
@@ -81,29 +89,29 @@ public abstract class Marshaller {
     @Deprecated
     public static Marshaller createMarshaller(Columns cols, Class<? extends Object> cls) {
         final BinaryMode mode = MarshallerUtil.mode(cls);
-        
-        if (mode != null) {
+
+        if (mode != BinaryMode.POJO) {
             final Column col = cols.column(0);
-            
+
             assert cols.length() == 1;
             assert mode.typeSpec() == col.type().spec() : "Target type is not compatible.";
             assert !cls.isPrimitive() : "Non-nullable types are not allowed.";
-            
+
             return new SimpleMarshaller(FieldAccessor.createIdentityAccessor(col, col.schemaIndex(), mode));
         }
-        
+
         FieldAccessor[] fieldAccessors = new FieldAccessor[cols.length()];
-        
+
         // Build accessors
         for (int i = 0; i < cols.length(); i++) {
             final Column col = cols.column(i);
-            
+
             fieldAccessors[i] = FieldAccessor.create(cls, col.name(), col, col.schemaIndex());
         }
-        
+
         return new PojoMarshaller(new ObjectFactory<>(cls), fieldAccessors);
     }
-    
+
     /**
      * Reads object field value.
      *
@@ -112,7 +120,7 @@ public abstract class Marshaller {
      * @return Field value.
      */
     public abstract @Nullable Object value(Object obj, int fldIdx);
-    
+
     /**
      * Reads object from a row.
      *
@@ -121,7 +129,7 @@ public abstract class Marshaller {
      * @throws MarshallerException If failed.
      */
     public abstract Object readObject(Row reader) throws MarshallerException;
-    
+
     /**
      * Write an object to a row.
      *
@@ -130,14 +138,14 @@ public abstract class Marshaller {
      * @throws MarshallerException If failed.
      */
     public abstract void writeObject(Object obj, RowAssembler writer) throws MarshallerException;
-    
+
     /**
      * Marshaller for objects of natively supported types.
      */
     static class SimpleMarshaller extends Marshaller {
         /** Identity accessor. */
         private final FieldAccessor fieldAccessor;
-        
+
         /**
          * Creates a marshaller for objects of natively supported type.
          *
@@ -146,40 +154,40 @@ public abstract class Marshaller {
         SimpleMarshaller(FieldAccessor fieldAccessor) {
             this.fieldAccessor = fieldAccessor;
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public @Nullable
         Object value(Object obj, int fldIdx) {
             assert fldIdx == 0;
-            
+
             return fieldAccessor.value(obj);
         }
-        
+
         /** {@inheritDoc} */
         @Override
-        public Object readObject(Row reader) {
+        public Object readObject(Row reader) throws MarshallerException {
             return fieldAccessor.read(reader);
         }
-        
-        
+
+
         /** {@inheritDoc} */
         @Override
         public void writeObject(Object obj, RowAssembler writer) throws MarshallerException {
             fieldAccessor.write(writer, obj);
         }
     }
-    
+
     /**
      * Marshaller for POJOs.
      */
     static class PojoMarshaller extends Marshaller {
         /** Field accessors for mapped columns. Array has same size and order as columns. */
         private final FieldAccessor[] fieldAccessors;
-        
+
         /** Object factory. */
         private final Factory<?> factory;
-        
+
         /**
          * Creates a marshaller for POJOs.
          *
@@ -191,26 +199,26 @@ public abstract class Marshaller {
             this.fieldAccessors = fieldAccessors;
             this.factory = Objects.requireNonNull(factory);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public @Nullable
         Object value(Object obj, int fldIdx) {
             return fieldAccessors[fldIdx].value(obj);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public Object readObject(Row reader) throws MarshallerException {
             final Object obj = factory.create();
-            
+
             for (int fldIdx = 0; fldIdx < fieldAccessors.length; fldIdx++) {
                 fieldAccessors[fldIdx].read(reader, obj);
             }
-            
+
             return obj;
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public void writeObject(Object obj, RowAssembler writer)
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index 0a024ad..0a18aff 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -47,6 +47,10 @@ import com.facebook.presto.bytecode.MethodDefinition;
 import com.facebook.presto.bytecode.ParameterizedType;
 import com.facebook.presto.bytecode.Variable;
 import com.facebook.presto.bytecode.expression.BytecodeExpressions;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -89,27 +93,27 @@ public class KvMarshallerTest {
     private static List<MarshallerFactory> marshallerFactoryProvider() {
         return List.of(new ReflectionMarshallerFactory());
     }
-    
+
     /** Random. */
     private Random rnd;
-    
+
     /**
      * Init test.
      */
     @BeforeEach
     public void initRandom() {
         long seed = System.currentTimeMillis();
-        
+
         System.out.println("Using seed: " + seed + "L;");
-        
+
         rnd = new Random(seed);
     }
-    
+
     @TestFactory
     public Stream<DynamicNode> basicTypes() {
         NativeType[] types = new NativeType[]{INT8, INT16, INT32, INT64, FLOAT, DOUBLE, UUID, STRING, BYTES,
                 NativeTypes.bitmaskOf(5), NativeTypes.numberOf(42), NativeTypes.decimalOf(12, 3)};
-        
+
         return marshallerFactoryProvider().stream().map(factory ->
                 dynamicContainer(
                         factory.getClass().getSimpleName(),
@@ -118,7 +122,7 @@ public class KvMarshallerTest {
                                 Stream.of(types).map(type ->
                                         dynamicTest("testBasicTypes(" + type.spec().name() + ')', () -> checkBasicType(factory, type, type))
                                 ),
-                                
+
                                 // Test pairs of mixed types.
                                 Stream.of(
                                         dynamicTest("testMixTypes 1", () -> checkBasicType(factory, INT64, INT32)),
@@ -132,57 +136,57 @@ public class KvMarshallerTest {
                         )
                 ));
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void pojoWithFieldsOfAllTypes(MarshallerFactory factory) throws MarshallerException {
         Column[] cols = columnsAllTypes();
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         final TestObjectWithAllTypes key = TestObjectWithAllTypes.randomObject(rnd);
         final TestObjectWithAllTypes val = TestObjectWithAllTypes.randomObject(rnd);
-        
+
         KvMarshaller<TestObjectWithAllTypes, TestObjectWithAllTypes> marshaller =
                 factory.create(schema, TestObjectWithAllTypes.class, TestObjectWithAllTypes.class);
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         TestObjectWithAllTypes restoredVal = marshaller.unmarshalValue(new Row(schema, row));
         TestObjectWithAllTypes restoredKey = marshaller.unmarshalKey(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(restoredKey));
         assertTrue(val.getClass().isInstance(restoredVal));
-        
+
         assertEquals(key, restoredKey);
         assertEquals(val, restoredVal);
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void narrowType(MarshallerFactory factory) throws MarshallerException {
         Column[] cols = columnsAllTypes();
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         KvMarshaller<TestTruncatedObject, TestTruncatedObject> marshaller =
                 factory.create(schema, TestTruncatedObject.class, TestTruncatedObject.class);
-        
+
         final TestTruncatedObject key = TestTruncatedObject.randomObject(rnd);
         final TestTruncatedObject val = TestTruncatedObject.randomObject(rnd);
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         Object restoredVal = marshaller.unmarshalValue(new Row(schema, row));
         Object restoredKey = marshaller.unmarshalKey(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(restoredKey));
         assertTrue(val.getClass().isInstance(restoredVal));
-        
+
         assertEquals(key, restoredKey);
         assertEquals(val, restoredVal);
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void wideType(MarshallerFactory factory) throws MarshallerException {
@@ -191,43 +195,43 @@ public class KvMarshallerTest {
                 new Column("primitiveDoubleCol", DOUBLE, false),
                 new Column("stringCol", STRING, true),
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         KvMarshaller<TestObjectWithAllTypes, TestObjectWithAllTypes> marshaller =
                 factory.create(schema, TestObjectWithAllTypes.class, TestObjectWithAllTypes.class);
-        
+
         final TestObjectWithAllTypes key = TestObjectWithAllTypes.randomObject(rnd);
         final TestObjectWithAllTypes val = TestObjectWithAllTypes.randomObject(rnd);
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         TestObjectWithAllTypes restoredVal = marshaller.unmarshalValue(new Row(schema, row));
         TestObjectWithAllTypes restoredKey = marshaller.unmarshalKey(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(restoredKey));
         assertTrue(val.getClass().isInstance(restoredVal));
-        
+
         TestObjectWithAllTypes expectedKey = new TestObjectWithAllTypes();
         expectedKey.setPrimitiveLongCol(key.getPrimitiveLongCol());
         expectedKey.setPrimitiveDoubleCol(key.getPrimitiveDoubleCol());
         expectedKey.setStringCol(key.getStringCol());
-        
+
         TestObjectWithAllTypes expectedVal = new TestObjectWithAllTypes();
         expectedVal.setPrimitiveLongCol(val.getPrimitiveLongCol());
         expectedVal.setPrimitiveDoubleCol(val.getPrimitiveDoubleCol());
         expectedVal.setStringCol(val.getStringCol());
-        
+
         assertEquals(expectedKey, restoredKey);
         assertEquals(expectedVal, restoredVal);
-        
+
         // Check non-mapped fields has default values.
         assertNull(restoredKey.getUuidCol());
         assertNull(restoredVal.getUuidCol());
         assertEquals(0, restoredKey.getPrimitiveIntCol());
         assertEquals(0, restoredVal.getPrimitiveIntCol());
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void columnNameMapping(MarshallerFactory factory) throws MarshallerException {
@@ -238,35 +242,35 @@ public class KvMarshallerTest {
                         new Column("col2", INT64, true),
                         new Column("col3", STRING, false)
                 });
-        
+
         Mapper<TestKeyObject> keyMapper = Mapper.builderFor(TestKeyObject.class)
                 .map("id", "key")
                 .build();
-        
+
         Mapper<TestObject> valMapper = Mapper.builderFor(TestObject.class)
                 .map("longCol", "col1")
                 .map("stringCol", "col3")
                 .build();
-        
+
         KvMarshaller<TestKeyObject, TestObject> marshaller = factory.create(schema, keyMapper, valMapper);
-        
+
         final TestKeyObject key = TestKeyObject.randomObject(rnd);
         final TestObject val = TestObject.randomObject(rnd);
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         Object restoredVal = marshaller.unmarshalValue(new Row(schema, row));
         Object restoredKey = marshaller.unmarshalKey(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(restoredKey));
         assertTrue(val.getClass().isInstance(restoredVal));
-        
+
         val.longCol2 = null;
-        
+
         assertEquals(key, restoredKey);
         assertEquals(val, restoredVal);
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void classWithWrongFieldType(MarshallerFactory factory) {
@@ -274,22 +278,22 @@ public class KvMarshallerTest {
                 new Column("bitmaskCol", NativeTypes.bitmaskOf(42), true),
                 new Column("shortCol", UUID, true)
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         KvMarshaller<TestObjectWithAllTypes, TestObjectWithAllTypes> marshaller =
                 factory.create(schema, TestObjectWithAllTypes.class, TestObjectWithAllTypes.class);
-        
+
         final TestObjectWithAllTypes key = TestObjectWithAllTypes.randomObject(rnd);
         final TestObjectWithAllTypes val = TestObjectWithAllTypes.randomObject(rnd);
-        
+
         assertThrows(
                 MarshallerException.class,
                 () -> marshaller.marshal(key, val),
                 "Failed to write field [name=shortCol]"
         );
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void classWithIncorrectBitmaskSize(MarshallerFactory factory) {
@@ -297,22 +301,22 @@ public class KvMarshallerTest {
                 new Column("primitiveLongCol", INT64, false),
                 new Column("bitmaskCol", NativeTypes.bitmaskOf(9), true),
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         KvMarshaller<TestObjectWithAllTypes, TestObjectWithAllTypes> marshaller =
                 factory.create(schema, TestObjectWithAllTypes.class, TestObjectWithAllTypes.class);
-        
+
         final TestObjectWithAllTypes key = TestObjectWithAllTypes.randomObject(rnd);
         final TestObjectWithAllTypes val = TestObjectWithAllTypes.randomObject(rnd);
-        
+
         assertThrows(
                 MarshallerException.class,
                 () -> marshaller.marshal(key, val),
                 "Failed to write field [name=bitmaskCol]"
         );
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void classWithPrivateConstructor(MarshallerFactory factory) throws MarshallerException {
@@ -320,39 +324,39 @@ public class KvMarshallerTest {
                 new Column("primLongCol", INT64, false),
                 new Column("primIntCol", INT32, false),
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         KvMarshaller<TestObjectWithPrivateConstructor, TestObjectWithPrivateConstructor> marshaller =
                 factory.create(schema, TestObjectWithPrivateConstructor.class, TestObjectWithPrivateConstructor.class);
-        
+
         final TestObjectWithPrivateConstructor key = TestObjectWithPrivateConstructor.randomObject(rnd);
         final TestObjectWithPrivateConstructor val = TestObjectWithPrivateConstructor.randomObject(rnd);
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         Object key1 = marshaller.unmarshalKey(new Row(schema, row));
         Object val1 = marshaller.unmarshalValue(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
-        
+
         assertEquals(key, key);
         assertEquals(val, val1);
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void classWithNoDefaultConstructor(MarshallerFactory factory) {
         Column[] cols = new Column[]{
                 new Column("primLongCol", INT64, false),
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         final Object key = TestObjectWithNoDefaultConstructor.randomObject(rnd);
         final Object val = TestObjectWithNoDefaultConstructor.randomObject(rnd);
-        
+
         assertThrows(IgniteInternalException.class, () -> factory.create(schema, key.getClass(), val.getClass()));
     }
 
@@ -362,64 +366,116 @@ public class KvMarshallerTest {
         Column[] cols = new Column[]{
                 new Column("primLongCol", INT64, false),
         };
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
-        
+
         final ObjectFactory<PrivateTestObject> objFactory = new ObjectFactory<>(PrivateTestObject.class);
         final KvMarshaller<PrivateTestObject, PrivateTestObject> marshaller =
                 factory.create(schema, PrivateTestObject.class, PrivateTestObject.class);
-        
+
         final PrivateTestObject key = PrivateTestObject.randomObject(rnd);
         final PrivateTestObject val = PrivateTestObject.randomObject(rnd);
-        
+
         BinaryRow row = marshaller.marshal(key, objFactory.create());
-        
+
         Object key1 = marshaller.unmarshalKey(new Row(schema, row));
         Object val1 = marshaller.unmarshalValue(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
     }
-    
+
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void classLoader(MarshallerFactory factory) throws MarshallerException {
         final ClassLoader loader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(new DynamicClassLoader(getClass().getClassLoader()));
-            
+
             Column[] keyCols = new Column[]{
                     new Column("key", INT64, false)
             };
-            
+
             Column[] valCols = new Column[]{
                     new Column("col0", INT64, false),
                     new Column("col1", INT64, false),
                     new Column("col2", INT64, false),
             };
-            
+
             SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, valCols);
-            
+
             final Class<?> valClass = createGeneratedObjectClass();
             final ObjectFactory<?> objFactory = new ObjectFactory<>(valClass);
-            
+
             KvMarshaller<Long, Object> marshaller = factory.create(schema, Long.class, (Class<Object>) valClass);
-            
+
             final Long key = rnd.nextLong();
-            
+
             BinaryRow row = marshaller.marshal(key, objFactory.create());
-            
+
             Long key1 = marshaller.unmarshalKey(new Row(schema, row));
             Object val1 = marshaller.unmarshalValue(new Row(schema, row));
-            
+
             assertTrue(valClass.isInstance(val1));
-            
+
             assertEquals(key, key1);
         } finally {
             Thread.currentThread().setContextClassLoader(loader);
         }
     }
-    
+
+    @ParameterizedTest
+    @MethodSource("marshallerFactoryProvider")
+    public void pojoMapping(MarshallerFactory factory) throws MarshallerException, IOException {
+        final SchemaDescriptor schema = new SchemaDescriptor(
+                1,
+                new Column[]{new Column("key", INT64, false)},
+                new Column[]{new Column("val", BYTES, true),
+                });
+
+        final TestPojo pojo = new TestPojo(42);
+        final byte[] serializedPojo = serializeObject(pojo);
+
+        final KvMarshaller<Long, TestPojo> marshaller1 = factory.create(schema,
+                Mapper.of("key", Long.class), Mapper.of("val", TestPojo.class));
+        final KvMarshaller<Long, byte[]> marshaller2 = factory.create(schema,
+                Mapper.of("key", Long.class), Mapper.of("val", byte[].class));
+        final KvMarshaller<Long, TestPojoWrapper> marshaller3 = factory.create(schema,
+                Mapper.of("key", Long.class), Mapper.builderFor(TestPojoWrapper.class).map("pojoField", "val").build());
+        final KvMarshaller<Long, TestPojoWrapper> marshaller4 = factory.create(schema,
+                Mapper.of("key", Long.class), Mapper.builderFor(TestPojoWrapper.class).map("rawField", "val").build());
+
+        BinaryRow row = marshaller1.marshal(1L, pojo);
+        BinaryRow row2 = marshaller2.marshal(1L, serializedPojo);
+        BinaryRow row3= marshaller3.marshal(1L, new TestPojoWrapper(pojo));
+        BinaryRow row4 = marshaller4.marshal(1L, new TestPojoWrapper(serializedPojo));
+
+        // Verify all rows are equivalent.
+        assertArrayEquals(row.bytes(), row2.bytes());
+        assertArrayEquals(row.bytes(), row3.bytes());
+        assertArrayEquals(row.bytes(), row4.bytes());
+
+        // Check key.
+        assertEquals(1L, marshaller1.unmarshalKey(new Row(schema, row)));
+        assertEquals(1L, marshaller2.unmarshalKey(new Row(schema, row)));
+        assertEquals(1L, marshaller3.unmarshalKey(new Row(schema, row)));
+        assertEquals(1L, marshaller4.unmarshalKey(new Row(schema, row)));
+
+        // Check values.
+        assertEquals(pojo,  marshaller1.unmarshalValue(new Row(schema, row)));
+        assertArrayEquals(serializedPojo,  marshaller2.unmarshalValue(new Row(schema, row)));
+        assertEquals(new TestPojoWrapper(pojo),  marshaller3.unmarshalValue(new Row(schema, row)));
+        assertEquals(new TestPojoWrapper(serializedPojo),  marshaller4.unmarshalValue(new Row(schema, row)));
+    }
+
+    public byte[] serializeObject(TestPojo obj) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+        try (ObjectOutputStream dos = new ObjectOutputStream(baos)) {
+            dos.writeObject(obj);
+        }
+        return baos.toByteArray();
+    }
+
     /**
      * Generate random key-value pair of given types and check serialization and deserialization works fine.
      *
@@ -432,26 +488,26 @@ public class KvMarshallerTest {
             NativeType valType) throws MarshallerException {
         final Object key = generateRandomValue(keyType);
         final Object val = generateRandomValue(valType);
-        
+
         Column[] keyCols = new Column[]{new Column("key", keyType, false)};
         Column[] valCols = new Column[]{new Column("val", valType, false)};
-        
+
         SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, valCols);
-        
+
         KvMarshaller<Object, Object> marshaller = factory.create(schema, (Class<Object>) key.getClass(), (Class<Object>) val.getClass());
-        
+
         BinaryRow row = marshaller.marshal(key, val);
-        
+
         Object key1 = marshaller.unmarshalKey(new Row(schema, row));
         Object val1 = marshaller.unmarshalValue(new Row(schema, row));
-        
+
         assertTrue(key.getClass().isInstance(key1));
         assertTrue(val.getClass().isInstance(val1));
-        
+
         compareObjects(keyType, key, key);
         compareObjects(valType, val, val1);
     }
-    
+
     /**
      * Compare object regarding NativeType.
      *
@@ -466,7 +522,7 @@ public class KvMarshallerTest {
             assertEquals(exp, act);
         }
     }
-    
+
     /**
      * Generates random value of given type.
      *
@@ -475,7 +531,7 @@ public class KvMarshallerTest {
     private Object generateRandomValue(NativeType type) {
         return SchemaTestUtils.generateRandomValue(rnd, type);
     }
-    
+
     /**
      * Generate class for test objects.
      *
@@ -484,40 +540,40 @@ public class KvMarshallerTest {
     private Class<?> createGeneratedObjectClass() {
         final String packageName = getClass().getPackageName();
         final String className = "GeneratedTestObject";
-        
+
         final ClassDefinition classDef = new ClassDefinition(
                 EnumSet.of(Access.PUBLIC),
                 packageName.replace('.', '/') + '/' + className,
                 ParameterizedType.type(Object.class)
         );
         classDef.declareAnnotation(Generated.class).setValue("value", getClass().getCanonicalName());
-        
+
         for (int i = 0; i < 3; i++) {
             classDef.declareField(EnumSet.of(Access.PRIVATE), "col" + i, ParameterizedType.type(long.class));
         }
-        
+
         // Build constructor.
         final MethodDefinition methodDef = classDef.declareConstructor(EnumSet.of(Access.PUBLIC));
         final Variable rnd = methodDef.getScope().declareVariable(Random.class, "rnd");
-        
+
         BytecodeBlock body = methodDef.getBody()
                 .append(methodDef.getThis())
                 .invokeConstructor(classDef.getSuperClass())
                 .append(rnd.set(BytecodeExpressions.newInstance(Random.class)));
-        
+
         for (int i = 0; i < 3; i++) {
             body.append(methodDef.getThis().setField("col" + i, rnd.invoke("nextLong", long.class).cast(long.class)));
         }
-        
+
         body.ret();
-        
+
         return ClassGenerator.classGenerator(Thread.currentThread().getContextClassLoader())
                 .fakeLineNumbers(true)
                 .runAsmVerifier(true)
                 .dumpRawBytecode(true)
                 .defineClass(classDef, Object.class);
     }
-    
+
     private Column[] columnsAllTypes() {
         Column[] cols = new Column[]{
                 new Column("primitiveByteCol", INT8, false, () -> (byte) 0x42),
@@ -526,7 +582,7 @@ public class KvMarshallerTest {
                 new Column("primitiveLongCol", INT64, false),
                 new Column("primitiveFloatCol", FLOAT, false),
                 new Column("primitiveDoubleCol", DOUBLE, false),
-                
+
                 new Column("byteCol", INT8, true),
                 new Column("shortCol", INT16, true),
                 new Column("intCol", INT32, true),
@@ -534,12 +590,12 @@ public class KvMarshallerTest {
                 new Column("nullLongCol", INT64, true),
                 new Column("floatCol", FLOAT, true),
                 new Column("doubleCol", DOUBLE, true),
-                
+
                 new Column("dateCol", DATE, true),
                 new Column("timeCol", time(), true),
                 new Column("dateTimeCol", datetime(), true),
                 new Column("timestampCol", timestamp(), true),
-                
+
                 new Column("uuidCol", UUID, true),
                 new Column("bitmaskCol", NativeTypes.bitmaskOf(42), true),
                 new Column("stringCol", STRING, true),
@@ -553,11 +609,11 @@ public class KvMarshallerTest {
                 .collect(Collectors.toSet());
         Set<NativeTypeSpec> missedTypes = Arrays.stream(NativeTypeSpec.values())
                 .filter(t -> !testedTypes.contains(t)).collect(Collectors.toSet());
-        
+
         assertEquals(Collections.emptySet(), missedTypes);
         return cols;
     }
-    
+
     /**
      * Test object.
      */
@@ -565,14 +621,14 @@ public class KvMarshallerTest {
     public static class TestKeyObject {
         static TestKeyObject randomObject(Random rnd) {
             final TestKeyObject obj = new TestKeyObject();
-            
+
             obj.id = rnd.nextLong();
-            
+
             return obj;
         }
-        
+
         private long id;
-        
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -584,14 +640,14 @@ public class KvMarshallerTest {
             TestKeyObject that = (TestKeyObject) o;
             return id == that.id;
         }
-        
+
         @Override
         public int hashCode() {
             return Objects.hash(id);
         }
     }
-    
-    
+
+
     /**
      * Test object.
      */
@@ -599,43 +655,43 @@ public class KvMarshallerTest {
     public static class TestObject {
         static TestObject randomObject(Random rnd) {
             final TestObject obj = new TestObject();
-            
+
             obj.longCol = rnd.nextLong();
             obj.longCol2 = rnd.nextLong();
             obj.stringCol = IgniteTestUtils.randomString(rnd, 100);
-            
+
             return obj;
         }
-        
+
         private long longCol;
-        
+
         private Long longCol2;
-        
+
         private String stringCol;
-        
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
             }
-            
+
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            
+
             TestObject that = (TestObject) o;
-            
+
             return longCol == that.longCol
                     && Objects.equals(longCol2, that.longCol2)
                     && Objects.equals(stringCol, that.stringCol);
         }
-        
+
         @Override
         public int hashCode() {
             return Objects.hash(longCol, longCol2, stringCol);
         }
     }
-    
+
     /**
      * Test object.
      */
@@ -643,30 +699,30 @@ public class KvMarshallerTest {
     public static class TestTruncatedObject {
         static TestTruncatedObject randomObject(Random rnd) {
             final TestTruncatedObject obj = new TestTruncatedObject();
-            
+
             obj.primitiveIntCol = rnd.nextInt();
             obj.primitiveLongCol = rnd.nextLong();
             obj.primitiveDoubleCol = rnd.nextDouble();
-            
+
             obj.uuidCol = java.util.UUID.randomUUID();
             obj.stringCol = IgniteTestUtils.randomString(rnd, 100);
-            
+
             return obj;
         }
-        
+
         // Primitive typed
         private int primitiveIntCol;
-        
+
         private long primitiveLongCol;
-        
+
         private float primitiveFloatCol;
-        
+
         private double primitiveDoubleCol;
-        
+
         private String stringCol;
-        
+
         private java.util.UUID uuidCol;
-        
+
         /** {@inheritDoc} */
         @Override
         public boolean equals(Object o) {
@@ -676,9 +732,9 @@ public class KvMarshallerTest {
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            
+
             TestTruncatedObject object = (TestTruncatedObject) o;
-            
+
             return primitiveIntCol == object.primitiveIntCol
                     && primitiveLongCol == object.primitiveLongCol
                     && Float.compare(object.primitiveFloatCol, primitiveFloatCol) == 0
@@ -686,14 +742,14 @@ public class KvMarshallerTest {
                     && Objects.equals(stringCol, ((TestTruncatedObject) o).stringCol)
                     && Objects.equals(uuidCol, ((TestTruncatedObject) o).uuidCol);
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public int hashCode() {
             return 42;
         }
     }
-    
+
     /**
      * Test object without default constructor.
      */
@@ -705,41 +761,114 @@ public class KvMarshallerTest {
         static PrivateTestObject randomObject(Random rnd) {
             return new PrivateTestObject(rnd.nextInt());
         }
-        
+
         /** Value. */
         private long primLongCol;
-        
+
         /** Constructor. */
         PrivateTestObject() {
         }
-        
+
         /**
          * Private constructor.
          */
         PrivateTestObject(long val) {
             primLongCol = val;
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
             }
-            
+
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            
+
             PrivateTestObject object = (PrivateTestObject) o;
-            
+
             return primLongCol == object.primLongCol;
         }
-        
+
         /** {@inheritDoc} */
         @Override
         public int hashCode() {
             return Objects.hash(primLongCol);
         }
     }
+
+    /**
+     * Test object represents a user object of arbitrary type.
+     */
+    static class TestPojo implements Serializable {
+        private static final long serialVersionUid = -1L;
+
+        int intField;
+
+        public TestPojo() {
+        }
+
+        public TestPojo(int intVal) {
+            this.intField = intVal;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestPojo testPojo = (TestPojo) o;
+            return intField == testPojo.intField;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(intField);
+        }
+    }
+
+    /**
+     * Wrapper for the {@link TestPojo}.
+     */
+    static class TestPojoWrapper {
+        TestPojo pojoField;
+
+        byte[] rawField;
+
+        public TestPojoWrapper() {
+        }
+
+        public TestPojoWrapper(TestPojo pojoField) {
+            this.pojoField = pojoField;
+        }
+
+        public TestPojoWrapper(byte[] rawField) {
+            this.rawField = rawField;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestPojoWrapper that = (TestPojoWrapper) o;
+            return Objects.equals(pojoField, that.pojoField) &&
+                    Arrays.equals(rawField, that.rawField);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(pojoField);
+            result = 31 * result + Arrays.hashCode(rawField);
+            return result;
+        }
+    }
 }
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/MapperTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/MapperTest.java
index 1f1e90c..ddcf469 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/MapperTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/MapperTest.java
@@ -17,8 +17,12 @@
 
 package org.apache.ignite.internal.schema.marshaller;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.testobjects.TestOuterObject;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.table.mapper.MapperBuilder;
 import org.junit.jupiter.api.Test;
@@ -27,49 +31,87 @@ import org.junit.jupiter.api.Test;
  * Columns mappers test.
  */
 public class MapperTest {
-    
+
     @Test
-    public void misleadingMapping() {
+    public void misleadingMapperUsage() {
         // Empty mapping.
         assertThrows(IllegalStateException.class, () -> Mapper.builderFor(TestObject.class).build());
-        
+
         // Many fields to one column.
         assertThrows(IllegalArgumentException.class, () -> Mapper.builderFor(TestObject.class)
                 .map("id", "key")
                 .map("longCol", "key"));
-        
+
         // One field to many columns.
         assertThrows(IllegalStateException.class, () -> Mapper.builderFor(TestObject.class)
                 .map("id", "key")
                 .map("id", "val1")
                 .map("stringCol", "val2")
                 .build());
-        
+
         // Mapper builder reuse fails.
         assertThrows(IllegalStateException.class, () -> {
             MapperBuilder<TestObject> builder = Mapper.builderFor(TestObject.class)
                     .map("id", "key");
-            
+
             builder.build();
-            
+
             builder.map("stringCol", "val2");
         });
     }
-    
+
+    @Test
+    public void supportedClassKinds() {
+        class LocalClass {
+            long id;
+        }
+
+        Function anonymous = (i) -> i;
+
+        Mapper.builderFor(TestOuterObject.class);
+        Mapper.builderFor(TestOuterObject.NestedObect.class);
+
+        assertThrows(IllegalArgumentException.class, () -> Mapper.builderFor(Long.class));
+        assertThrows(IllegalArgumentException.class, () -> Mapper.builderFor(TestOuterObject.InnerObject.class));
+        assertThrows(IllegalArgumentException.class, () -> Mapper.builderFor(LocalClass.class));
+        assertThrows(IllegalArgumentException.class, () -> Mapper.builderFor(anonymous.getClass()));
+
+        Mapper.of("key", Long.class);
+        Mapper.of("key", TestOuterObject.class);
+        Mapper.of("key", TestOuterObject.NestedObect.class);
+
+        assertThrows(IllegalArgumentException.class, () -> Mapper.of("key", TestOuterObject.InnerObject.class));
+        assertThrows(IllegalArgumentException.class, () -> Mapper.of("key", LocalClass.class));
+        assertThrows(IllegalArgumentException.class, () -> Mapper.of("key", anonymous.getClass()));
+    }
+
     @Test
     public void identityMapping() {
-        Mapper.identity(TestObject.class);
+        Mapper<TestObject> mapper = Mapper.of(TestObject.class);
+
+        assertNull(mapper.mappedColumn());
+        assertEquals("id", mapper.mappedField("id"));
+        assertNull(mapper.mappedField("val"));
     }
-    
+
+    @Test
+    public void basicMapping() {
+        Mapper<TestObject> mapper = Mapper.of(TestObject.class);
+
+        assertNull(mapper.mappedColumn());
+        assertEquals("id", mapper.mappedField("id"));
+        assertNull(mapper.mappedField("val"));
+    }
+
     /**
      * Test object.
      */
     @SuppressWarnings({"InstanceVariableMayNotBeInitialized", "unused"})
-    public static class TestObject {
+    static class TestObject {
         private long id;
-        
+
         private long longCol;
-        
+
         private String stringCol;
     }
 }
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/testobjects/TestOuterObject.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/testobjects/TestOuterObject.java
new file mode 100644
index 0000000..8e1b16c
--- /dev/null
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/testobjects/TestOuterObject.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.testobjects;
+
+/**
+ * Test object.
+ */
+public class TestOuterObject {
+    private long id;
+
+    public class InnerObject {
+        private long id1;
+    }
+
+    public static class NestedObect {
+        private long id2;
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
index b090c36..2cd31dc 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
@@ -22,6 +22,9 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -35,9 +38,8 @@ import org.junit.jupiter.params.provider.MethodSource;
 /**
  * Example.
  */
-@SuppressWarnings({
-        "PMD.EmptyLineSeparatorCheck", "emptylineseparator",
-        "unused", "UnusedAssignment", "InstanceVariableMayNotBeInitialized", "JoinDeclarationAndAssignmentJava"})
+@SuppressWarnings({"PMD.EmptyLineSeparatorCheck", "emptylineseparator", "unused", "UnusedAssignment", "InstanceVariableMayNotBeInitialized",
+        "JoinDeclarationAndAssignmentJava"})
 public class Example {
     /**
      * Returns table implementation.
@@ -240,16 +242,14 @@ public class Example {
             String bankName;
         }
 
-        KeyValueView<OrderKey, OrderValue> orderKvView = t.keyValueView(Mapper.of(OrderKey.class),
-                Mapper.builderFor(OrderValue.class)
-                        .map("billingDetails", (row) -> {
-                            BinaryObject binObj = row.binaryObjectValue("conditionalDetails");
-                            int type = row.intValue("type");
+        KeyValueView<OrderKey, OrderValue> orderKvView = t
+                .keyValueView(Mapper.of("key", OrderKey.class), Mapper.builderFor(OrderValue.class).map("billingDetails", (row) -> {
+                    BinaryObject binObj = row.binaryObjectValue("conditionalDetails");
+                    int type = row.intValue("type");
 
-                            return type == 0
-                                    ? BinaryObjects.deserialize(binObj, CreditCard.class) :
-                                    BinaryObjects.deserialize(binObj, BankAccount.class);
-                        }).build());
+                    return type == 0 ? BinaryObjects.deserialize(binObj, CreditCard.class)
+                            : BinaryObjects.deserialize(binObj, BankAccount.class);
+                }).build());
 
         OrderValue ov = orderKvView.get(new OrderKey(1, 1));
 
@@ -284,9 +284,8 @@ public class Example {
         binObj = orderRecord.billingDetails;
 
         // Manual deserialization is possible as well.
-        Object billingDetails = orderRecord.type == 0
-                ? BinaryObjects.deserialize(binObj, CreditCard.class) :
-                BinaryObjects.deserialize(binObj, BankAccount.class);
+        Object billingDetails = orderRecord.type == 0 ? BinaryObjects.deserialize(binObj, CreditCard.class)
+                : BinaryObjects.deserialize(binObj, BankAccount.class);
     }
 
     /**
@@ -352,25 +351,21 @@ public class Example {
             int department;
         }
 
-        RecordView<TruncatedRecord> truncatedView = t.recordView(
-                Mapper.builderFor(TruncatedRecord.class)
-                        .map("upgradedObject", JavaPersonV2.class).build());
+        RecordView<TruncatedRecord> truncatedView = t
+                .recordView(Mapper.builderFor(TruncatedRecord.class).map("upgradedObject", JavaPersonV2.class).build());
 
         // Or we can have a custom conditional type selection.
-        RecordView<TruncatedRecord> truncatedView2 = t.recordView(
-                Mapper.builderFor(TruncatedRecord.class)
-                        .map("upgradedObject", (row) -> {
-                            BinaryObject binObj1 = row.binaryObjectValue("upgradedObject");
-                            int dept = row.intValue("department");
-
-                            return dept == 0
-                                    ? BinaryObjects.deserialize(binObj1, JavaPerson.class) :
-                                    BinaryObjects.deserialize(binObj1, JavaPersonV2.class);
-                        }).build());
+        RecordView<TruncatedRecord> truncatedView2 = t.recordView(Mapper.builderFor(TruncatedRecord.class).map("upgradedObject", (row) -> {
+            BinaryObject binObj1 = row.binaryObjectValue("upgradedObject");
+            int dept = row.intValue("department");
+
+            return dept == 0 ? BinaryObjects.deserialize(binObj1, JavaPerson.class)
+                    : BinaryObjects.deserialize(binObj1, JavaPersonV2.class);
+        }).build());
     }
 
     /**
-     * Use case 1: a simple one. The table has the structure [ [id long] // key [name varchar, lastName varchar, decimal salary, int
+     * Use case 6: a simple one. The table has the structure [ [id long] // key [name varchar, lastName varchar, decimal salary, int
      * department] // value ] We show how to use the raw TableRow and a mapped class.
      */
     @Disabled
@@ -406,7 +401,7 @@ public class Example {
     }
 
     /**
-     * Use case 1: a simple one. The table has the structure [ [byte[]] // key [name varchar, lastName varchar, decimal salary, int
+     * Use case 7: a simple one. The table has the structure [ [byte[]] // key [name varchar, lastName varchar, decimal salary, int
      * department] // value ] We show how to use the raw TableRow and a mapped class.
      */
     @Disabled
@@ -425,8 +420,50 @@ public class Example {
 
         employeeView.put(1L, BinaryObjects.wrap(new byte[0] /* serialized Employee */));
 
-        t.keyValueView(
-                Mapper.identity(Long.class),
-                Mapper.builderFor(BinaryObject.class).deserializeTo(Employee.class).build());
+        t.keyValueView(Mapper.identity(Long.class), Mapper.of("value", Employee.class));
+    }
+
+    /**
+     * Use case 8: Here we show how to use mapper for single column case.
+     */
+    @Disabled
+    @ParameterizedTest
+    @MethodSource("tableFactory")
+    public void useCase8(Table t) {
+        new SchemaDescriptor(
+                1,
+                new Column[]{new Column("key", NativeTypes.INT64, false)},
+                new Column[]{new Column("val", NativeTypes.BYTES, true),
+        });
+
+        class UserObject {
+        }
+
+        class Employee {
+            UserObject data;
+        }
+
+        class Employee2 {
+            byte[] data;
+        }
+
+        // Class usage without a column name can work correctly only and only when each of key and value parts is single column.
+        KeyValueView<Long, Employee> v1 = t.keyValueView(Long.class, Employee.class);
+
+        KeyValueView<Long, Employee> v2 = t.keyValueView(
+                Mapper.of(Long.class), // Class usage without a column name can work correctly only and only when the key part is single column.
+                Mapper.builderFor(Employee.class).map("data", "val").build());
+
+        KeyValueView<Long, Employee> v3 = t.keyValueView(
+                Mapper.of("key", Long.class),
+                Mapper.builderFor(Employee.class).map("data", "val").build());
+
+        KeyValueView<Long, UserObject> v4 = t.keyValueView(
+                Mapper.of("key", Long.class),
+                Mapper.of("data", UserObject.class));
+
+        KeyValueView<Long, byte[]> v5 = t.keyValueView(
+                Mapper.of("key", Long.class),
+                Mapper.of("data", byte[].class));
     }
 }