You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/10/26 11:47:53 UTC

[1/2] ignite git commit: IGNITE-1753 Reworking JDBC POJO store to new configuration. Merging POJO and Portable stores in single store. Fixed several tests.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1753-1282 [created] 8c1a71b28


http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
new file mode 100644
index 0000000..b333bc7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.store.jdbc.dialect.*;
+
+import java.io.*;
+
+/**
+ * JDBC POJO store configuration.
+ */
+public class CacheJdbcPojoStoreConfiguration implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Default value for write attempts. */
+    public static final int DFLT_WRITE_ATTEMPTS = 2;
+
+    /** Default batch size for put and remove operations. */
+    public static final int DFLT_BATCH_SIZE = 512;
+
+    /** Default batch size for put and remove operations. */
+    public static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512;
+
+    /** Maximum batch size for writeAll and deleteAll operations. */
+    private int batchSz = DFLT_BATCH_SIZE;
+
+    /** Name of data source bean. */
+    private String dataSrcBean;
+
+    /** Database dialect. */
+    private JdbcDialect dialect;
+
+    /** Max workers thread count. These threads are responsible for load cache. */
+    private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+    /** Maximum write attempts in case of database error. */
+    private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
+
+    /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
+    private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+    /** Types that store could process. */
+    private CacheJdbcPojoStoreType[] types;
+
+    /**
+     * Empty constructor (all values are initialized to their defaults).
+     */
+    public CacheJdbcPojoStoreConfiguration() {
+        /* No-op. */
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param cfg Configuration to copy.
+     */
+    public CacheJdbcPojoStoreConfiguration(CacheJdbcPojoStoreConfiguration cfg) {
+        // Order alphabetically for maintenance purposes.
+        batchSz = cfg.getBatchSize();
+        dataSrcBean = cfg.getDataSourceBean();
+        dialect = cfg.getDialect();
+        maxPoolSz = cfg.getMaximumPoolSize();
+        maxWrtAttempts = cfg.getMaximumWriteAttempts();
+        parallelLoadCacheMinThreshold = cfg.getParallelLoadCacheMinimumThreshold();
+        types = cfg.getTypes();
+    }
+
+    /**
+     * Get maximum batch size for delete and delete operations.
+     *
+     * @return Maximum batch size.
+     */
+    public int getBatchSize() {
+        return batchSz;
+    }
+
+    /**
+     * Set maximum batch size for write and delete operations.
+     *
+     * @param batchSz Maximum batch size.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setBatchSize(int batchSz) {
+        this.batchSz = batchSz;
+
+        return this;
+    }
+
+    /**
+     * Gets name of the data source bean.
+     *
+     * @return Data source bean name.
+     */
+    public String getDataSourceBean() {
+        return dataSrcBean;
+    }
+
+    /**
+     * Sets name of the data source bean.
+     *
+     * @param dataSrcBean Data source bean name.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setDataSourceBean(String dataSrcBean) {
+        this.dataSrcBean = dataSrcBean;
+
+        return this;
+    }
+
+    /**
+     * Get database dialect.
+     *
+     * @return Database dialect.
+     */
+    public JdbcDialect getDialect() {
+        return dialect;
+    }
+
+    /**
+     * Set database dialect.
+     *
+     * @param dialect Database dialect.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setDialect(JdbcDialect dialect) {
+        this.dialect = dialect;
+
+        return this;
+    }
+
+    /**
+     * Get maximum workers thread count. These threads are responsible for queries execution.
+     *
+     * @return Maximum workers thread count.
+     */
+    public int getMaximumPoolSize() {
+        return maxPoolSz;
+    }
+
+    /**
+     * Set Maximum workers thread count. These threads are responsible for queries execution.
+     *
+     * @param maxPoolSz Max workers thread count.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setMaximumPoolSize(int maxPoolSz) {
+        this.maxPoolSz = maxPoolSz;
+
+        return this;
+    }
+
+    /**
+     * Gets maximum number of write attempts in case of database error.
+     *
+     * @return Maximum number of write attempts.
+     */
+    public int getMaximumWriteAttempts() {
+        return maxWrtAttempts;
+    }
+
+    /**
+     * Sets maximum number of write attempts in case of database error.
+     *
+     * @param maxWrtAttempts Number of write attempts.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setMaximumWriteAttempts(int maxWrtAttempts) {
+        this.maxWrtAttempts = maxWrtAttempts;
+
+        return this;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @return If {@code 0} then load sequentially.
+     */
+    public int getParallelLoadCacheMinimumThreshold() {
+        return parallelLoadCacheMinThreshold;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) {
+        this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+
+        return this;
+    }
+
+    /**
+     * Gets types known by store.
+     *
+     * @return Types known by store.
+     */
+    public CacheJdbcPojoStoreType[] getTypes() {
+        return types;
+    }
+
+    /**
+     * Sets store configurations.
+     *
+     * @param types Store should process.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStoreConfiguration setTypes(CacheJdbcPojoStoreType... types) {
+        this.types = types;
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
index c90a69b..6d8f8af 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
@@ -61,43 +61,65 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** POJO store configuration. */
+    private CacheJdbcPojoStoreConfiguration cfg;
+
     /** Name of data source bean. */
+    @Deprecated
     private String dataSrcBean;
 
-    /** Data source. */
-    private transient DataSource dataSrc;
-
     /** Database dialect. */
+    @Deprecated
     private JdbcDialect dialect;
 
+    /** Data source. */
+    private transient DataSource dataSrc;
+
     /** Application context. */
     @SpringApplicationContextResource
-    private transient Object appContext;
+    private transient Object appCtx;
 
     /** {@inheritDoc} */
     @Override public CacheJdbcPojoStore<K, V> create() {
         CacheJdbcPojoStore<K, V> store = new CacheJdbcPojoStore<>();
 
-        store.setDialect(dialect);
+        // For backward compatibility create store configuration.
+        if (cfg == null) {
+            cfg = new CacheJdbcPojoStoreConfiguration();
+
+            cfg.setDataSourceBean(dataSrcBean);
+            cfg.setDialect(dialect);
+        }
+
+        store.setBatchSize(cfg.getBatchSize());
+        store.setDialect(cfg.getDialect());
+        store.setMaximumPoolSize(cfg.getMaximumPoolSize());
+        store.setMaximumWriteAttempts(cfg.getMaximumWriteAttempts());
+        store.setParallelLoadCacheMinimumThreshold(cfg.getParallelLoadCacheMinimumThreshold());
+        store.setTypes(cfg.getTypes());
 
         if (dataSrc != null)
             store.setDataSource(dataSrc);
-        else if (dataSrcBean != null) {
-            if (appContext == null)
-                throw new IgniteException("Spring application context resource is not injected.");
+        else {
+            String dtSrcBean = cfg.getDataSourceBean();
 
-            IgniteSpringHelper spring;
+            if (dtSrcBean != null) {
+                if (appCtx == null)
+                    throw new IgniteException("Spring application context resource is not injected.");
 
-            try {
-                spring = IgniteComponentType.SPRING.create(false);
+                IgniteSpringHelper spring;
 
-                DataSource data = spring.loadBeanFromAppContext(appContext, dataSrcBean);
+                try {
+                    spring = IgniteComponentType.SPRING.create(false);
 
-                store.setDataSource(data);
-            }
-            catch (Exception e) {
-                throw new IgniteException("Failed to load bean in application context [beanName=" + dataSrcBean +
-                    ", igniteConfig=" + appContext + ']', e);
+                    DataSource data = spring.loadBeanFromAppContext(appCtx, dtSrcBean);
+
+                    store.setDataSource(data);
+                }
+                catch (Exception e) {
+                    throw new IgniteException("Failed to load bean in application context [beanName=" + dtSrcBean +
+                        ", igniteConfig=" + appCtx + ']', e);
+                }
             }
         }
 
@@ -105,27 +127,27 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
     }
 
     /**
-     * Sets name of the data source bean.
+     * Sets store configuration.
      *
-     * @param dataSrcBean Data source bean name.
+     * @param cfg Configuration to use.
      * @return {@code This} for chaining.
-     * @see CacheJdbcPojoStore#setDataSource(DataSource)
      */
-    public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) {
-        this.dataSrcBean = dataSrcBean;
+    public CacheJdbcPojoStoreFactory<K, V> setConfiguration(CacheJdbcPojoStoreConfiguration cfg) {
+        this.cfg = cfg;
 
         return this;
     }
 
     /**
-     * Sets data source. Data source should be fully configured and ready-to-use.
+     * Sets name of the data source bean.
      *
-     * @param dataSrc Data source.
+     * @param dataSrcBean Data source bean name.
      * @return {@code This} for chaining.
      * @see CacheJdbcPojoStore#setDataSource(DataSource)
      */
-    public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) {
-        this.dataSrc = dataSrc;
+    @Deprecated
+    public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) {
+        this.dataSrcBean = dataSrcBean;
 
         return this;
     }
@@ -136,12 +158,26 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
      * @param dialect Database dialect.
      * @see CacheJdbcPojoStore#setDialect(JdbcDialect)
      */
+    @Deprecated
     public void setDialect(JdbcDialect dialect) {
         this.dialect = dialect;
     }
 
+    /**
+     * Sets data source. Data source should be fully configured and ready-to-use.
+     *
+     * @param dataSrc Data source.
+     * @return {@code This} for chaining.
+     * @see CacheJdbcPojoStore#setDataSource(DataSource)
+     */
+    public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheJdbcPojoStoreFactory.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
new file mode 100644
index 0000000..e755165
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+
+/**
+ * Description for type that could be stored into database by store.
+ */
+public class CacheJdbcPojoStoreType implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Schema name in database. */
+    private String dbSchema;
+
+    /** Table name in database. */
+    private String dbTbl;
+
+    /** Key class used to store key in cache. */
+    private String keyType;
+
+    /** List of fields descriptors for key object. */
+    @GridToStringInclude
+    private CacheJdbcPojoStoreTypeField[] keyFields;
+
+    /** Value class used to store value in cache. */
+    private String valType;
+
+    /** List of fields descriptors for value object. */
+    @GridToStringInclude
+    private CacheJdbcPojoStoreTypeField[] valFields;
+
+    /** If {@code true} object is stored as IgniteObject. */
+    private boolean keepSerialized;
+
+    /**
+     * Empty constructor (all values are initialized to their defaults).
+     */
+    public CacheJdbcPojoStoreType() {
+        /* No-op. */
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param type Type to copy.
+     */
+    public CacheJdbcPojoStoreType(CacheJdbcPojoStoreType type) {
+        cacheName = type.getCacheName();
+
+        dbSchema = type.getDatabaseSchema();
+        dbTbl = type.getDatabaseTable();
+
+        keyType = type.getKeyType();
+        keyFields = type.getKeyFields();
+
+        valType = type.getValueType();
+        valFields = type.getValueFields();
+
+        keepSerialized = type.isKeepSerialized();
+    }
+
+    /**
+     * Gets associated cache name.
+     *
+     * @return Сache name.
+     */
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    /**
+     * Sets associated cache name.
+     *
+     * @param cacheName Cache name.
+     */
+    public CacheJdbcPojoStoreType setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+
+        return this;
+    }
+
+    /**
+     * Gets database schema name.
+     *
+     * @return Schema name.
+     */
+    public String getDatabaseSchema() {
+        return dbSchema;
+    }
+
+    /**
+     * Sets database schema name.
+     *
+     * @param dbSchema Schema name.
+     */
+    public CacheJdbcPojoStoreType setDatabaseSchema(String dbSchema) {
+        this.dbSchema = dbSchema;
+
+        return this;
+    }
+
+    /**
+     * Gets table name in database.
+     *
+     * @return Table name in database.
+     */
+    public String getDatabaseTable() {
+        return dbTbl;
+    }
+
+    /**
+     * Table name in database.
+     *
+     * @param dbTbl Table name in database.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setDatabaseTable(String dbTbl) {
+        this.dbTbl = dbTbl;
+
+        return this;
+    }
+
+    /**
+     * Gets key type.
+     *
+     * @return Key type.
+     */
+    public String getKeyType() {
+        return keyType;
+    }
+
+    /**
+     * Sets key type.
+     *
+     * @param keyType Key type.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setKeyType(String keyType) {
+        this.keyType = keyType;
+
+        return this;
+    }
+
+    /**
+     * Sets key type.
+     *
+     * @param cls Key type class.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setKeyType(Class<?> cls) {
+        setKeyType(cls.getName());
+
+        return this;
+    }
+
+    /**
+     * Gets value type.
+     *
+     * @return Key type.
+     */
+    public String getValueType() {
+        return valType;
+    }
+
+    /**
+     * Sets value type.
+     *
+     * @param valType Value type.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setValueType(String valType) {
+        this.valType = valType;
+
+        return this;
+    }
+
+    /**
+     * Sets value type.
+     *
+     * @param cls Value type class.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setValueType(Class<?> cls) {
+        setValueType(cls.getName());
+
+        return this;
+    }
+
+    /**
+     * Gets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used).
+     *
+     * @return Persistent key fields.
+     */
+    public CacheJdbcPojoStoreTypeField[] getKeyFields() {
+        return keyFields;
+    }
+
+    /**
+     * Sets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used).
+     *
+     * @param keyFields Persistent key fields.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setKeyFields(CacheJdbcPojoStoreTypeField... keyFields) {
+        this.keyFields = keyFields;
+
+        return this;
+    }
+
+    /**
+     * Gets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used).
+     *
+     * @return Persistent value fields.
+     */
+    public CacheJdbcPojoStoreTypeField[] getValueFields() {
+        return valFields;
+    }
+
+    /**
+     * Sets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used).
+     *
+     * @param valFields Persistent value fields.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setValueFields(CacheJdbcPojoStoreTypeField... valFields) {
+        this.valFields = valFields;
+
+        return this;
+    }
+
+    /**
+     * Gets how value stored in cache.
+     *
+     * @return {@code true} if object is stored as IgniteObject.
+     */
+    public boolean isKeepSerialized() {
+        return keepSerialized;
+    }
+
+    /**
+     * Sets how value stored in cache.
+     *
+     * @param keepSerialized {@code true} if object is stored as IgniteObject.
+     * @return {@code this} for chaining.
+     */
+    public CacheJdbcPojoStoreType setKeepSerialized(boolean keepSerialized) {
+        this.keepSerialized = keepSerialized;
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
new file mode 100644
index 0000000..46a2647
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Description of how field declared in database and in cache.
+ */
+public class CacheJdbcPojoStoreTypeField implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Field JDBC type in database. */
+    private int dbFieldType;
+
+    /** Field name in database. */
+    private String dbFieldName;
+
+    /** Field java type. */
+    private Class<?> javaFieldType;
+
+    /** Field name in java object. */
+    private String javaFieldName;
+
+    /**
+     * Default constructor.
+     */
+    public CacheJdbcPojoStoreTypeField() {
+        // No-op.
+    }
+
+    /**
+     * Full constructor.
+     *
+     * @param dbFieldType Field JDBC type in database.
+     * @param dbFieldName Field name in database.
+     * @param javaFieldType Field java type.
+     * @param javaFieldName Field name in java object.
+     */
+    public CacheJdbcPojoStoreTypeField(int dbFieldType, String dbFieldName, Class<?> javaFieldType, String javaFieldName) {
+        this.dbFieldType = dbFieldType;
+        this.dbFieldName = dbFieldName;
+        this.javaFieldType = javaFieldType;
+        this.javaFieldName = javaFieldName;
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param field Field to copy.
+     */
+    public CacheJdbcPojoStoreTypeField(CacheJdbcPojoStoreTypeField field) {
+        this(field.getDatabaseFieldType(), field.getDatabaseFieldName(),
+            field.getJavaFieldType(), field.getJavaFieldName());
+    }
+
+    /**
+     * @return Column JDBC type in database.
+     */
+    public int getDatabaseFieldType() {
+        return dbFieldType;
+    }
+
+    /**
+     * @param dbType Column JDBC type in database.
+     */
+    public void setDatabaseFieldType(int dbType) {
+        this.dbFieldType = dbType;
+    }
+
+
+    /**
+     * @return Column name in database.
+     */
+    public String getDatabaseFieldName() {
+        return dbFieldName;
+    }
+
+    /**
+     * @param dbName Column name in database.
+     */
+    public void setDatabaseFieldName(String dbName) {
+        this.dbFieldName = dbName;
+    }
+
+    /**
+     * @return Field java type.
+     */
+    public Class<?> getJavaFieldType() {
+        return javaFieldType;
+    }
+
+    /**
+     * @param javaType Corresponding java type.
+     */
+    public void setJavaFieldType(Class<?> javaType) {
+        this.javaFieldType = javaType;
+    }
+
+    /**
+     * @return Field name in java object.
+     */
+    public String getJavaFieldName() {
+        return javaFieldName;
+    }
+
+    /**
+     * @param javaName Field name in java object.
+     */
+    public void setJavaFieldName(String javaName) {
+        this.javaFieldName = javaName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CacheJdbcPojoStoreTypeField))
+            return false;
+
+        CacheJdbcPojoStoreTypeField that = (CacheJdbcPojoStoreTypeField)o;
+
+        return dbFieldType == that.dbFieldType && dbFieldName.equals(that.dbFieldName) &&
+            javaFieldType == that.javaFieldType && javaFieldName.equals(that.javaFieldName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = dbFieldType;
+        res = 31 * res + dbFieldName.hashCode();
+
+        res = 31 * res + javaFieldType.hashCode();
+        res = 31 * res + javaFieldName.hashCode();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheJdbcPojoStoreTypeField.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 0ad2cad..b2d871c 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -17,23 +17,19 @@
 
 package org.apache.ignite.cache.store.jdbc;
 
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.cache.integration.CacheWriterException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheTypeMetadata;
-import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect;
-import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect;
+
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
 import org.apache.ignite.cache.store.jdbc.model.Organization;
 import org.apache.ignite.cache.store.jdbc.model.OrganizationKey;
 import org.apache.ignite.cache.store.jdbc.model.Person;
@@ -41,16 +37,11 @@ import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey;
 import org.apache.ignite.cache.store.jdbc.model.PersonKey;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest;
 import org.h2.jdbcx.JdbcConnectionPool;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.core.io.UrlResource;
 
 /**
  * Class for {@code PojoCacheStore} tests.
@@ -59,9 +50,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
     /** DB connection URL. */
     private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
 
-    /** Default config with mapping. */
-    private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml";
-
     /** Organization count. */
     protected static final int ORGANIZATION_CNT = 1000;
 
@@ -77,71 +65,89 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
     /** {@inheritDoc} */
     @Override protected CacheJdbcPojoStore<Object, Object> store() {
-        CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>();
-
-//        PGPoolingDataSource ds = new PGPoolingDataSource();
-//        ds.setUser("postgres");
-//        ds.setPassword("postgres");
-//        ds.setServerName("ip");
-//        ds.setDatabaseName("postgres");
-//        store.setDataSource(ds);
-
-//        MysqlDataSource ds = new MysqlDataSource();
-//        ds.setURL("jdbc:mysql://ip:port/dbname");
-//        ds.setUser("mysql");
-//        ds.setPassword("mysql");
-
+        CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();
+
+        CacheJdbcPojoStoreConfiguration storeCfg = new CacheJdbcPojoStoreConfiguration();
+        storeCfg.setDialect(new H2Dialect());
+
+        CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[6];
+
+        storeTypes[0] = new CacheJdbcPojoStoreType();
+        storeTypes[0].setDatabaseSchema("PUBLIC");
+        storeTypes[0].setDatabaseTable("ORGANIZATION");
+        storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey");
+        storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"));
+
+        storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization");
+        storeTypes[0].setValueFields(
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+            new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"),
+            new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city"));
+
+        storeTypes[1] = new CacheJdbcPojoStoreType();
+        storeTypes[1].setDatabaseSchema("PUBLIC");
+        storeTypes[1].setDatabaseTable("PERSON");
+        storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey");
+        storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"));
+
+        storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+        storeTypes[1].setValueFields(
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"),
+            new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"));
+
+        storeTypes[2] = new CacheJdbcPojoStoreType();
+        storeTypes[2].setDatabaseSchema("PUBLIC");
+        storeTypes[2].setDatabaseTable("PERSON_COMPLEX");
+        storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey");
+        storeTypes[2].setKeyFields(
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", int.class, "id"),
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId"),
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId"));
+
+        storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+        storeTypes[2].setValueFields(
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+            new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"),
+            new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"),
+            new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "SALARY", Integer.class, "salary"));
+
+        storeTypes[3] = new CacheJdbcPojoStoreType();
+        storeTypes[3].setDatabaseSchema("PUBLIC");
+        storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES");
+        storeTypes[3].setKeyType("java.sql.Timestamp");
+        storeTypes[3].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null));
+
+        storeTypes[3].setValueType("java.lang.Integer");
+        storeTypes[3].setValueFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "VAL", Integer.class, null));
+
+        storeTypes[4] = new CacheJdbcPojoStoreType();
+        storeTypes[4].setDatabaseSchema("PUBLIC");
+        storeTypes[4].setDatabaseTable("STRING_ENTRIES");
+        storeTypes[4].setKeyType("java.lang.String");
+        storeTypes[4].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "KEY", String.class, null));
+
+        storeTypes[4].setValueType("java.lang.String");
+        storeTypes[4].setValueFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "VAL", Integer.class, null));
+
+        storeTypes[5] = new CacheJdbcPojoStoreType();
+        storeTypes[5].setDatabaseSchema("PUBLIC");
+        storeTypes[5].setDatabaseTable("UUID_ENTRIES");
+        storeTypes[5].setKeyType("java.util.UUID");
+        storeTypes[5].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "KEY", UUID.class, null));
+
+        storeTypes[5].setValueType("java.util.UUID");
+        storeTypes[5].setValueFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "VAL", UUID.class, null));
+
+        storeCfg.setTypes(storeTypes);
+
+        storeFactory.setConfiguration(storeCfg);
+
+        CacheJdbcPojoStore<Object, Object> store = storeFactory.create();
+
+        // H2 DataSource
         store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
 
-        URL cfgUrl;
-
-        try {
-            cfgUrl = new URL(DFLT_MAPPING_CONFIG);
-        }
-        catch (MalformedURLException ignore) {
-            cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG);
-        }
-
-        if (cfgUrl == null)
-            throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG);
-
-        try {
-            GenericApplicationContext springCtx = new GenericApplicationContext();
-
-            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl));
-
-            springCtx.refresh();
-
-            Collection<CacheTypeMetadata> typeMeta = springCtx.getBeansOfType(CacheTypeMetadata.class).values();
-
-            Map<Integer, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings = new HashMap<>();
-
-            JdbcDialect dialect = store.resolveDialect();
-
-            GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "dialect", dialect);
-
-            Map<Object, CacheAbstractJdbcStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size());
-
-            for (CacheTypeMetadata type : typeMeta)
-                entryMappings.put(store.keyTypeId(type.getKeyType()),
-                    new CacheAbstractJdbcStore.EntryMapping(null, dialect, type));
-
-            store.prepareBuilders(null, typeMeta);
-
-            cacheMappings.put(null, entryMappings);
-
-            GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings", cacheMappings);
-        }
-        catch (BeansException e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                throw new IgniteException("Failed to instantiate Spring XML application context " +
-                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
-                    "[springUrl=" + cfgUrl + ']', e);
-            else
-                throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" +
-                    cfgUrl + ", err=" + e.getMessage() + ']', e);
-        }
-
         return store;
     }
 
@@ -152,7 +158,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
     @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception {
         getTestResources().inject(store);
 
-        GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses);
+        GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "ses", ses);
     }
 
     /** {@inheritDoc} */
@@ -224,7 +230,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
         super.beforeTest();
     }
 
-
     /**
      * @throws Exception If failed.
      */
@@ -274,7 +279,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
             if (i > 0)
                 prnComplexStmt.setInt(5, 1000 + i * 500);
             else // Add person with null salary
-                prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+                prnComplexStmt.setNull(5, Types.INTEGER);
 
             prnComplexStmt.addBatch();
         }
@@ -302,9 +307,9 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
                     Person val = (Person)v;
 
-                    assert key.getId() == val.getId();
-                    assert key.getOrgId() == val.getOrgId();
-                    assertEquals("name"  + key.getId(), val.getName());
+                    assertTrue("Key ID should be the same as value ID", key.getId() == val.getId());
+                    assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId());
+                    assertEquals("name" + key.getId(), val.getName());
 
                     prnComplexKeys.add((PersonComplexKey)k);
                 }
@@ -351,25 +356,23 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
      * @throws Exception If failed.
      */
     public void testWriteRetry() throws Exception {
+        CacheJdbcPojoStore<Object, Object> store = store();
+
         // Special dialect that will skip updates, to test write retry.
-        BasicJdbcDialect dialect = new BasicJdbcDialect() {
+        store.setDialect(new H2Dialect() {
             /** {@inheritDoc} */
-            @Override public String updateQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols) {
-                return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0";
+            @Override public boolean hasMerge() {
+                return false;
             }
-        };
-
-        store.setDialect(dialect);
-
-        Map<String, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings =
-            GridTestUtils.getFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings");
-
-        CacheAbstractJdbcStore.EntryMapping em = cacheMappings.get(null).get(OrganizationKey.class);
 
-        CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class, "typeMeta");
+            /** {@inheritDoc} */
+            @Override public String updateQuery(String tblName, Collection<String> keyCols,
+                Iterable<String> valCols) {
+                return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0";
+            }
+        });
 
-        cacheMappings.get(null).put(OrganizationKey.class,
-            new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta));
+        inject(store);
 
         Connection conn = store.openConnection(false);
 
@@ -392,6 +395,8 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
         try {
             store.write(new CacheEntryImpl<>(k1, v1));
+
+            fail("CacheWriterException wasn't thrown.");
         }
         catch (CacheWriterException e) {
             if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") ||
@@ -418,4 +423,4 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
 
         assertNull(store.load(k));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
index 757cedd..42cc4c9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
@@ -60,7 +60,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsyn
 /**
  *
  */
-public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheAbstractJdbcStore>
+public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheJdbcPojoStore>
     extends GridCommonAbstractTest {
     /** Default config with mapping. */
     private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml";
@@ -78,7 +78,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
     private static final int BATCH_CNT = 2000;
 
     /** Cache store. */
-    protected static CacheAbstractJdbcStore store;
+    protected static CacheJdbcPojoStore store;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -308,4 +308,4 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
             }
         }, 8, "tx");
     }
-}
\ No newline at end of file
+}


[2/2] ignite git commit: IGNITE-1753 Reworking JDBC POJO store to new configuration. Merging POJO and Portable stores in single store. Fixed several tests.

Posted by ak...@apache.org.
IGNITE-1753 Reworking JDBC POJO store to new configuration. Merging POJO and Portable stores in single store. Fixed several tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c1a71b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c1a71b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c1a71b2

Branch: refs/heads/ignite-1753-1282
Commit: 8c1a71b28f4ffc73457c1a58efc735bbc2ad2ce6
Parents: 9d67c20
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Oct 26 17:47:43 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Oct 26 17:47:43 2015 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      |    3 +-
 .../cache/store/jdbc/CacheJdbcPojoStore.java    | 2209 +++++++++++++++++-
 .../jdbc/CacheJdbcPojoStoreConfiguration.java   |  230 ++
 .../store/jdbc/CacheJdbcPojoStoreFactory.java   |   90 +-
 .../store/jdbc/CacheJdbcPojoStoreType.java      |  272 +++
 .../store/jdbc/CacheJdbcPojoStoreTypeField.java |  160 ++
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |  205 +-
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |    6 +-
 8 files changed, 2953 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 6e27b9a..bd04fe7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -113,6 +113,7 @@ import static java.sql.Statement.SUCCESS_NO_INFO;
  *    ...
  * </pre>
  */
+@Deprecated
 public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, LifecycleAware {
     /** Max attempt write count. */
     protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
@@ -1821,4 +1822,4 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index d78ea48..6b3473a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -19,36 +19,1814 @@ package org.apache.ignite.cache.store.jdbc;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgnitePortables;
 import org.apache.ignite.cache.CacheTypeFieldMetadata;
 import org.apache.ignite.cache.CacheTypeMetadata;
 import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.DB2Dialect;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.OracleDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.SQLServerDialect;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.portable.PortableBuilder;
+import org.apache.ignite.portable.PortableObject;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 
+import static java.sql.Statement.EXECUTE_FAILED;
+import static java.sql.Statement.SUCCESS_NO_INFO;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_BATCH_SIZE;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_WRITE_ATTEMPTS;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
 /**
  * Implementation of {@link CacheStore} backed by JDBC and POJO via reflection.
  *
- * This implementation stores objects in underlying database using java beans mapping description via reflection.
- * <p>
+ * This implementation stores objects in underlying database using java beans mapping description via reflection. <p>
  * Use {@link CacheJdbcPojoStoreFactory} factory to pass {@link CacheJdbcPojoStore} to {@link CacheConfiguration}.
  */
-public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
+public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAware {
+    /** Connection attribute property name. */
+    private static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+
+    /** Auto-injected store session. */
+    @CacheStoreSessionResource
+    private CacheStoreSession ses;
+
+    /** Auto injected ignite instance. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Auto-injected logger instance. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Lock for metadata cache. */
+    @GridToStringExclude
+    private final Lock cacheMappingsLock = new ReentrantLock();
+
+    /** Data source. */
+    private DataSource dataSrc;
+
+    /** Cache with entry mapping description. (cache name, (key id, mapping description)). */
+    private volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap();
+
+    /** Maximum batch size for writeAll and deleteAll operations. */
+    private int batchSz = DFLT_BATCH_SIZE;
+
+    /** Database dialect. */
+    private JdbcDialect dialect;
+
+    /** Max workers thread count. These threads are responsible for load cache. */
+    private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+    /** Maximum write attempts in case of database error. */
+    private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
+
+    /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
+    private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+    /** Types that store could process. */
+    private CacheJdbcPojoStoreType[] types;
+
+    /** Map for quick check whether type is POJO or Portable. */
+    private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = new HashMap<>();
+
+    /** POJO methods cache. */
+    private volatile Map<String, Map<String, PojoMethodsCache>> pojoMethods = Collections.emptyMap();
+
+    /** Portables builders cache. */
+    private volatile Map<String, Map<String, PortableBuilder>> portableBuilders = Collections.emptyMap();
+
+    /**
+     * Checks for POJO/portable format.
+     *
+     * @param cacheName Cache name to get types settings.
+     * @param typeName Type name to check for POJO/portable format.
+     * @return {@code true} If portable format configured.
+     * @throws CacheException In case of error.
+     */
+    private boolean isKeepSerialized(String cacheName, String typeName) {
+        Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName);
+
+        if (cacheTypes == null)
+            throw new CacheException("Failed to find types metadata for cache: " + cacheName);
+
+        Boolean keepSerialized = cacheTypes.get(typeName);
+
+        if (keepSerialized == null)
+            throw new CacheException("Failed to find type metadata for type: " + typeName);
+
+        return keepSerialized;
+    }
+
+    /**
+     * Get field value from object for use as query parameter.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param obj Cache object.
+     * @return Field value from object.
+     * @throws CacheException in case of error.
+     */
+    @Nullable private Object extractParameter(@Nullable String cacheName, String typeName, String fieldName,
+        Object obj) throws CacheException {
+        return isKeepSerialized(cacheName, typeName)
+            ? extractPortableParameter(fieldName, obj)
+            : extractPojoParameter(cacheName, typeName, fieldName, obj);
+    }
+
+    /**
+     * Get field value from POJO for use as query parameter.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param obj Cache object.
+     * @return Field value from object.
+     * @throws CacheException in case of error.
+     */
+    @Nullable private Object extractPojoParameter(@Nullable String cacheName, String typeName, String fieldName,
+        Object obj) throws CacheException {
+        try {
+            Map<String, PojoMethodsCache> cacheMethods = pojoMethods.get(cacheName);
+
+            if (cacheMethods == null)
+                throw new CacheException("Failed to find POJO type metadata for cache: " + cacheName);
+
+            PojoMethodsCache mc = cacheMethods.get(typeName);
+
+            if (mc == null)
+                throw new CacheException("Failed to find POJO type metadata for type: " + typeName);
+
+            if (mc.simple)
+                return obj;
+
+            Method getter = mc.getters.get(fieldName);
+
+            if (getter == null)
+                throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
+                    ", prop=" + fieldName + "]");
+
+            return getter.invoke(obj);
+        }
+        catch (Exception e) {
+            throw new CacheException("Failed to read object of class: " + typeName, e);
+        }
+    }
+
+    /**
+     * Get field value from Portable for use as query parameter.
+     *
+     * @param fieldName Field name to extract query parameter for.
+     * @param obj Object to process.
+     * @return Field value from object.
+     * @throws CacheException in case of error.
+     */
+    private Object extractPortableParameter(String fieldName, Object obj) throws CacheException {
+        if (obj instanceof PortableObject) {
+            PortableObject pobj = (PortableObject)obj;
+
+            return pobj.field(fieldName);
+        }
+
+        throw new CacheException("Failed to read property value from non portable object [class name=" +
+            obj.getClass() + ", property=" + fieldName + "]");
+    }
+
+    /**
+     * Construct object from query result.
+     *
+     * @param <R> Type of result object.
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param loadColIdxs Select query columns index.
+     * @param rs ResultSet.
+     * @return Constructed object.
+     * @throws CacheLoaderException If failed to construct cache object.
+     */
+    private <R> R buildObject(@Nullable String cacheName, String typeName,
+        CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs)
+        throws CacheLoaderException {
+        return (R)(isKeepSerialized(cacheName, typeName)
+            ? buildPortableObject(cacheName, typeName, fields, loadColIdxs, rs)
+            : buildPojoObject(cacheName, typeName, fields, loadColIdxs, rs));
+    }
+
+    /**
+     * Construct POJO from query result.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param loadColIdxs Select query columns index.
+     * @param rs ResultSet.
+     * @return Constructed POJO.
+     * @throws CacheLoaderException If failed to construct POJO.
+     */
+    private Object buildPojoObject(@Nullable String cacheName, String typeName,
+        CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs)
+        throws CacheLoaderException {
+
+        Map<String, PojoMethodsCache> z = pojoMethods.get(cacheName);
+
+        if (z == null)
+            throw new CacheLoaderException("Failed to find POJO types metadata for cache: " + cacheName);
+
+        PojoMethodsCache mc = z.get(typeName);
+
+        if (mc == null)
+            throw new CacheLoaderException("Failed to find POJO type metadata for type: " + typeName);
+
+        try {
+            if (mc.simple) {
+                CacheJdbcPojoStoreTypeField field = fields[0];
+
+                return getColumnValue(rs, loadColIdxs.get(field.getDatabaseFieldName()), mc.cls);
+            }
+
+            Object obj = mc.ctor.newInstance();
+
+            for (CacheJdbcPojoStoreTypeField field : fields) {
+                String fldJavaName = field.getJavaFieldName();
+
+                Method setter = mc.setters.get(fldJavaName);
+
+                if (setter == null)
+                    throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
+                        ", prop=" + fldJavaName + "]");
+
+                String fldDbName = field.getDatabaseFieldName();
+
+                Integer colIdx = loadColIdxs.get(fldDbName);
+
+                try {
+                    setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaFieldType()));
+                }
+                catch (Exception e) {
+                    throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
+                        ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+                }
+            }
+
+            return obj;
+        }
+        catch (SQLException e) {
+            throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
+        }
+        catch (Exception e) {
+            throw new CacheLoaderException("Failed to construct instance of class: " + typeName, e);
+        }
+    }
+
+    /**
+     * Construct portable object from query result.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param loadColIdxs Select query columns index.
+     * @param rs ResultSet.
+     * @return Constructed portable object.
+     * @throws CacheLoaderException If failed to construct portable object.
+     */
+    protected PortableObject buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields,
+        Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheException {
+        Map<String, PortableBuilder> cachePortables = portableBuilders.get(cacheName);
+
+        if (cachePortables == null)
+            throw new CacheException("Failed to find portable builders for cache: " + cacheName);
+
+        PortableBuilder builder = cachePortables.get(typeName);
+
+        if (builder == null)
+            throw new CacheException("Failed to find portable builder for type: " + typeName);
+
+        try {
+            for (CacheJdbcPojoStoreTypeField field : fields) {
+                Class<?> type = field.getJavaFieldType();
+
+                Integer colIdx = loadColIdxs.get(field.getDatabaseFieldName());
+
+                builder.setField(field.getJavaFieldName(), getColumnValue(rs, colIdx, type));
+            }
+
+            return builder.build();
+        }
+        catch (SQLException e) {
+            throw new CacheException("Failed to read portable object", e);
+        }
+    }
+
+    /**
+     * Calculate type ID for object.
+     *
+     * @param obj Object to calculate type ID for.
+     * @return Type ID.
+     * @throws CacheException If failed to calculate type ID for given object.
+     */
+    private Object typeIdForObject(Object obj) throws CacheException {
+        if (obj instanceof PortableObject)
+            return ((PortableObject)obj).typeId();
+
+        return obj.getClass();
+    }
+
+    /**
+     * Calculate type ID for given type name.
+     *
+     * @param keepSerialized If {@code true} then calculate type ID for portable object otherwise for POJO.
+     * @param typeName String description of type name.
+     * @return Type ID.
+     * @throws CacheException If failed to get type ID for given type name.
+     */
+    private Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException {
+        if (keepSerialized)
+            return ignite().portables().typeId(typeName);
+
+        try {
+            return Class.forName(typeName);
+        }
+        catch (ClassNotFoundException e) {
+            throw new CacheException("Failed to find class: " + typeName, e);
+        }
+    }
+
+    /**
+     * Prepare builders for POJOs via reflection (getters and setters).
+     *
+     * @param cacheName Cache name to prepare builders for.
+     * @param types Collection of types.
+     * @throws CacheException If failed to prepare internal builders for types.
+     */
+    private void preparePojoBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types)
+        throws CacheException {
+        Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2);
+
+        for (CacheJdbcPojoStoreType type : types) {
+            if (!type.isKeepSerialized()) {
+                // TODO check for duplicates
+
+                String keyType = type.getKeyType();
+                typeMethods.put(keyType, new PojoMethodsCache(keyType, type.getKeyFields()));
+
+                String valType = type.getValueType();
+                typeMethods.put(valType, new PojoMethodsCache(valType, type.getValueFields()));
+            }
+        }
+
+        if (!typeMethods.isEmpty()) {
+            Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new HashMap<>(pojoMethods);
+
+            newMtdsCache.put(cacheName, typeMethods);
+
+            pojoMethods = newMtdsCache;
+        }
+    }
+
+    /**
+     * Prepare builders for portable objects via portable builder.
+     *
+     * @param cacheName Cache name to prepare builders for.
+     * @param types Collection of types.
+     * @throws CacheException If failed to prepare internal builders for types.
+     */
+    private void preparePortableBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types)
+        throws CacheException {
+        Map<String, PortableBuilder> typeBuilders = U.newHashMap(types.size() * 2);
+
+        for (CacheJdbcPojoStoreType type : types) {
+            if (type.isKeepSerialized()) {
+                Ignite ignite = ignite();
+
+                IgnitePortables portables = ignite.portables();
+
+                String keyType = type.getKeyType();
+                int keyTypeId = portables.typeId(keyType);
+                typeBuilders.put(keyType, portables.builder(keyTypeId));
+
+                String valType = type.getValueType();
+                int valTypeId = portables.typeId(valType);
+                typeBuilders.put(valType, portables.builder(valTypeId));
+            }
+        }
+
+        if (!typeBuilders.isEmpty()) {
+            Map<String, Map<String, PortableBuilder>> newBuilders = new HashMap<>(portableBuilders);
+
+            newBuilders.put(cacheName, typeBuilders);
+
+            portableBuilders = newBuilders;
+        }
+    }
+
+    /**
+     * Perform dialect resolution.
+     *
+     * @return The resolved dialect.
+     * @throws CacheException Indicates problems accessing the metadata.
+     */
+    private JdbcDialect resolveDialect() throws CacheException {
+        Connection conn = null;
+
+        String dbProductName = null;
+
+        try {
+            conn = openConnection(false);
+
+            dbProductName = conn.getMetaData().getDatabaseProductName();
+        }
+        catch (SQLException e) {
+            throw new CacheException("Failed access to metadata for detect database dialect.", e);
+        }
+        finally {
+            U.closeQuiet(conn);
+        }
+
+        if ("H2".equals(dbProductName))
+            return new H2Dialect();
+
+        if ("MySQL".equals(dbProductName))
+            return new MySQLDialect();
+
+        if (dbProductName.startsWith("Microsoft SQL Server"))
+            return new SQLServerDialect();
+
+        if ("Oracle".equals(dbProductName))
+            return new OracleDialect();
+
+        if (dbProductName.startsWith("DB2/"))
+            return new DB2Dialect();
+
+        U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be used): " + dbProductName);
+
+        return new BasicJdbcDialect();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (dataSrc == null)
+            throw new IgniteException("Failed to initialize cache store (data source is not provided).");
+
+        if (dialect == null) {
+            dialect = resolveDialect();
+
+            if (log.isDebugEnabled() && dialect.getClass() != BasicJdbcDialect.class)
+                log.debug("Resolved database dialect: " + U.getSimpleName(dialect.getClass()));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /**
+     * Gets connection from a pool.
+     *
+     * @param autocommit {@code true} If connection should use autocommit mode.
+     * @return Pooled connection.
+     * @throws SQLException In case of error.
+     */
+    protected Connection openConnection(boolean autocommit) throws SQLException {
+        Connection conn = dataSrc.getConnection();
+
+        conn.setAutoCommit(autocommit);
+
+        return conn;
+    }
+
+    /**
+     * @return Connection.
+     * @throws SQLException In case of error.
+     */
+    private Connection connection() throws SQLException {
+        CacheStoreSession ses = session();
+
+        if (ses.transaction() != null) {
+            Map<String, Connection> prop = ses.properties();
+
+            Connection conn = prop.get(ATTR_CONN_PROP);
+
+            if (conn == null) {
+                conn = openConnection(false);
+
+                // Store connection in session to used it for other operations in the same session.
+                prop.put(ATTR_CONN_PROP, conn);
+            }
+
+            return conn;
+        }
+        // Transaction can be null in case of simple load operation.
+        else
+            return openConnection(true);
+    }
+
+    /**
+     * Closes connection.
+     *
+     * @param conn Connection to close.
+     */
+    private void closeConnection(@Nullable Connection conn) {
+        CacheStoreSession ses = session();
+
+        // Close connection right away if there is no transaction.
+        if (ses.transaction() == null)
+            U.closeQuiet(conn);
+    }
+
+    /**
+     * Closes allocated resources depending on transaction status.
+     *
+     * @param conn Allocated connection.
+     * @param st Created statement,
+     */
+    private void end(@Nullable Connection conn, @Nullable Statement st) {
+        U.closeQuiet(st);
+
+        closeConnection(conn);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sessionEnd(boolean commit) throws CacheWriterException {
+        CacheStoreSession ses = session();
+
+        Transaction tx = ses.transaction();
+
+        if (tx != null) {
+            Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP);
+
+            assert conn != null;
+
+            try {
+                if (commit)
+                    conn.commit();
+                else
+                    conn.rollback();
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException(
+                    "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
+            }
+            finally {
+                U.closeQuiet(conn);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
+        }
+    }
+
+    /**
+     * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and will
+     * convert to the requested Java data type.
+     *
+     * @param rs Result set.
+     * @param colIdx Column index in result set.
+     * @param type Class representing the Java data type to convert the designated column to.
+     * @return Value in column.
+     * @throws SQLException If a database access error occurs or this method is called.
+     */
+    private Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException {
+        Object val = rs.getObject(colIdx);
+
+        if (val == null)
+            return null;
+
+        if (type == int.class)
+            return rs.getInt(colIdx);
+
+        if (type == long.class)
+            return rs.getLong(colIdx);
+
+        if (type == double.class)
+            return rs.getDouble(colIdx);
+
+        if (type == boolean.class || type == Boolean.class)
+            return rs.getBoolean(colIdx);
+
+        if (type == byte.class)
+            return rs.getByte(colIdx);
+
+        if (type == short.class)
+            return rs.getShort(colIdx);
+
+        if (type == float.class)
+            return rs.getFloat(colIdx);
+
+        if (type == Integer.class || type == Long.class || type == Double.class ||
+            type == Byte.class || type == Short.class || type == Float.class) {
+            Number num = (Number)val;
+
+            if (type == Integer.class)
+                return num.intValue();
+            else if (type == Long.class)
+                return num.longValue();
+            else if (type == Double.class)
+                return num.doubleValue();
+            else if (type == Byte.class)
+                return num.byteValue();
+            else if (type == Short.class)
+                return num.shortValue();
+            else if (type == Float.class)
+                return num.floatValue();
+        }
+
+        if (type == UUID.class) {
+            if (val instanceof UUID)
+                return val;
+
+            if (val instanceof byte[]) {
+                ByteBuffer bb = ByteBuffer.wrap((byte[])val);
+
+                long most = bb.getLong();
+                long least = bb.getLong();
+
+                return new UUID(most, least);
+            }
+
+            if (val instanceof String)
+                return UUID.fromString((String)val);
+        }
+
+        return val;
+    }
+
+    /**
+     * Construct load cache from range.
+     *
+     * @param em Type mapping description.
+     * @param clo Closure that will be applied to loaded values.
+     * @param lowerBound Lower bound for range.
+     * @param upperBound Upper bound for range.
+     * @return Callable for pool submit.
+     */
+    private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo,
+        @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Connection conn = null;
+
+                PreparedStatement stmt = null;
+
+                try {
+                    conn = openConnection(true);
+
+                    stmt = conn.prepareStatement(lowerBound == null && upperBound == null
+                        ? em.loadCacheQry
+                        : em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
+
+                    int ix = 1;
+
+                    if (lowerBound != null)
+                        for (int i = lowerBound.length; i > 0; i--)
+                            for (int j = 0; j < i; j++)
+                                stmt.setObject(ix++, lowerBound[j]);
+
+                    if (upperBound != null)
+                        for (int i = upperBound.length; i > 0; i--)
+                            for (int j = 0; j < i; j++)
+                                stmt.setObject(ix++, upperBound[j]);
+
+                    ResultSet rs = stmt.executeQuery();
+
+                    while (rs.next()) {
+                        K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
+                        V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+
+                        clo.apply(key, val);
+                    }
+                }
+                catch (SQLException e) {
+                    throw new IgniteCheckedException("Failed to load cache", e);
+                }
+                finally {
+                    U.closeQuiet(stmt);
+
+                    U.closeQuiet(conn);
+                }
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Construct load cache in one select.
+     *
+     * @param m Type mapping description.
+     * @param clo Closure for loaded values.
+     * @return Callable for pool submit.
+     */
+    private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) {
+        return loadCacheRange(m, clo, null, null);
+    }
+
+    /**
+     * Object is a simple type.
+     *
+     * @param cls Class.
+     * @return {@code True} if object is a simple type.
+     */
+    private static boolean simpleType(Class<?> cls) {
+        return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) ||
+            java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) ||
+            UUID.class.isAssignableFrom(cls));
+    }
+
+    /**
+     * @param cacheName Cache name to check mapping for.
+     * @param clsName Class name.
+     * @param fields Fields descriptors.
+     * @throws CacheException If failed to check type metadata.
+     */
+    private static void checkMapping(@Nullable String cacheName, String clsName,
+        CacheJdbcPojoStoreTypeField[] fields) throws CacheException {
+        try {
+            Class<?> cls = Class.forName(clsName);
+
+            if (simpleType(cls)) {
+                if (fields.length != 1)
+                    throw new CacheException("More than one field for simple type [cache name=" + cacheName
+                        + ", type=" + clsName + " ]");
+
+                CacheJdbcPojoStoreTypeField field = fields[0];
+
+                if (field.getDatabaseFieldName() == null)
+                    throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
+                        + ", type=" + clsName + " ]");
+
+                field.setJavaFieldType(cls);
+            }
+            else
+                for (CacheJdbcPojoStoreTypeField field : fields) {
+                    if (field.getDatabaseFieldName() == null)
+                        throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
+                            + ", type=" + clsName + " ]");
+
+                    if (field.getJavaFieldName() == null)
+                        throw new CacheException("Missing field name in mapping description [cache name=" + cacheName
+                            + ", type=" + clsName + " ]");
+
+                    if (field.getJavaFieldType() == null)
+                        throw new CacheException("Missing field type in mapping description [cache name=" + cacheName
+                            + ", type=" + clsName + " ]");
+                }
+        }
+        catch (ClassNotFoundException e) {
+            throw new CacheException("Failed to find class: " + clsName, e);
+        }
+    }
+
+    /**
+     * For backward compatibility translate old field type descriptors to new format.
+     *
+     * @param oldFields Fields in old format.
+     * @return Fields in new format.
+     */
+    @Deprecated
+    private CacheJdbcPojoStoreTypeField[] translateFields(Collection<CacheTypeFieldMetadata> oldFields) {
+        CacheJdbcPojoStoreTypeField[] newFields = new CacheJdbcPojoStoreTypeField[oldFields.size()];
+
+        int idx = 0;
+
+        for (CacheTypeFieldMetadata oldField : oldFields) {
+            newFields[idx] = new CacheJdbcPojoStoreTypeField(oldField.getDatabaseType(), oldField.getDatabaseName(),
+                oldField.getJavaType(), oldField.getJavaName());
+
+            idx++;
+        }
+
+        return newFields;
+    }
+
+    /**
+     * @param cacheName Cache name to check mappings for.
+     * @return Type mappings for specified cache name.
+     * @throws CacheException If failed to initialize cache mappings.
+     */
+    private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException {
+        Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
+
+        if (entryMappings != null)
+            return entryMappings;
+
+        cacheMappingsLock.lock();
+
+        try {
+            entryMappings = cacheMappings.get(cacheName);
+
+            if (entryMappings != null)
+                return entryMappings;
+
+            // If no types configured, check CacheTypeMetadata for backward compatibility.
+            if (types == null) {
+                CacheConfiguration ccfg = ignite().cache(cacheName).getConfiguration(CacheConfiguration.class);
+
+                Collection<CacheTypeMetadata> oldTypes = ccfg.getTypeMetadata();
+
+                types = new CacheJdbcPojoStoreType[oldTypes.size()];
+
+                int idx = 0;
+
+                for (CacheTypeMetadata oldType : oldTypes) {
+                    CacheJdbcPojoStoreType newType = new CacheJdbcPojoStoreType();
+
+                    newType.setCacheName(cacheName);
+
+                    newType.setDatabaseSchema(oldType.getDatabaseSchema());
+                    newType.setDatabaseTable(oldType.getDatabaseTable());
+
+                    newType.setKeyType(oldType.getKeyType());
+                    newType.setKeyFields(translateFields(oldType.getKeyFields()));
+
+                    newType.setValueType(oldType.getValueType());
+                    newType.setValueFields(translateFields(oldType.getValueFields()));
+
+                    types[idx] = newType;
+
+                    idx++;
+                }
+            }
+
+            List<CacheJdbcPojoStoreType> cacheTypes = new ArrayList<>(types.length);
+
+            for (CacheJdbcPojoStoreType type : types)
+                if ((cacheName != null && cacheName.equals(type.getCacheName())) ||
+                    (cacheName == null && type.getCacheName() == null))
+                    cacheTypes.add(type);
+
+            entryMappings = U.newHashMap(cacheTypes.size());
+
+            if (!cacheTypes.isEmpty()) {
+                Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2);
+
+                for (CacheJdbcPojoStoreType type : cacheTypes) {
+                    boolean keepSerialized = type.isKeepSerialized();
+
+                    String keyType = type.getKeyType();
+                    String valType = type.getValueType();
+
+                    tk.put(keyType, keepSerialized);
+                    tk.put(valType, keepSerialized);
+
+                    Object keyTypeId = typeIdForTypeName(keepSerialized, keyType);
+
+                    if (entryMappings.containsKey(keyTypeId))
+                        throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName +
+                            ", key type=" + keyType + "]");
+
+                    checkMapping(cacheName, keyType, type.getKeyFields());
+                    checkMapping(cacheName, valType, type.getValueFields());
+
+                    entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type));
+                }
+
+                keepSerializedTypes.put(cacheName, tk);
+
+                Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
+                mappings.put(cacheName, entryMappings);
+
+                preparePojoBuilders(cacheName, cacheTypes);
+                preparePortableBuilders(cacheName, cacheTypes);
+
+                cacheMappings = mappings;
+            }
+
+            return entryMappings;
+        }
+        finally {
+            cacheMappingsLock.unlock();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param keyTypeId Key type id.
+     * @param key Key object.
+     * @return Entry mapping.
+     * @throws CacheException If mapping for key was not found.
+     */
+    private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException {
+        EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
+
+        if (em == null) {
+            String maskedCacheName = U.maskName(cacheName);
+
+            throw new CacheException("Failed to find mapping description [key=" + key +
+                ", cache=" + maskedCacheName + "]. Please configure CacheJdbcPojoStoreType to associate '" + maskedCacheName + "' with JdbcPojoStore.");
+        }
+
+        return em;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args)
+        throws CacheLoaderException {
+        ExecutorService pool = null;
+
+        String cacheName = session().cacheName();
+
+        try {
+            pool = Executors.newFixedThreadPool(maxPoolSz);
+
+            Collection<Future<?>> futs = new ArrayList<>();
+
+            if (args != null && args.length > 0) {
+                if (args.length % 2 != 0)
+                    throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length);
+
+                if (log.isDebugEnabled())
+                    log.debug("Start loading entries from db using user queries from arguments");
+
+                for (int i = 0; i < args.length; i += 2) {
+                    String keyType = args[i].toString();
+
+                    String selQry = args[i + 1].toString();
+
+                    // We must build cache mappings first.
+                    cacheMappings(cacheName);
+
+                    EntryMapping em = entryMapping(cacheName, typeIdForTypeName(isKeepSerialized(cacheName, keyType),
+                        keyType), keyType);
+
+                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
+                }
+            }
+            else {
+                Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values();
+
+                for (EntryMapping em : entryMappings) {
+                    if (parallelLoadCacheMinThreshold > 0) {
+                        log.debug("Multithread loading entries from db [cache name=" + cacheName +
+                            ", key type=" + em.keyType() + " ]");
+
+                        Connection conn = null;
+
+                        try {
+                            conn = connection();
+
+                            PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry);
+
+                            stmt.setInt(1, parallelLoadCacheMinThreshold);
+
+                            ResultSet rs = stmt.executeQuery();
+
+                            if (rs.next()) {
+                                int keyCnt = em.keyCols.size();
+
+                                Object[] upperBound = new Object[keyCnt];
+
+                                for (int i = 0; i < keyCnt; i++)
+                                    upperBound[i] = rs.getObject(i + 1);
+
+                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound)));
+
+                                while (rs.next()) {
+                                    Object[] lowerBound = upperBound;
+
+                                    upperBound = new Object[keyCnt];
+
+                                    for (int i = 0; i < keyCnt; i++)
+                                        upperBound[i] = rs.getObject(i + 1);
+
+                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound)));
+                                }
+
+                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null)));
+                            }
+                            else
+                                futs.add(pool.submit(loadCacheFull(em, clo)));
+                        }
+                        catch (SQLException ignored) {
+                            futs.add(pool.submit(loadCacheFull(em, clo)));
+                        }
+                        finally {
+                            U.closeQuiet(conn);
+                        }
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Single thread loading entries from db [cache name=" + cacheName +
+                                ", key type=" + em.keyType() + " ]");
+
+                        futs.add(pool.submit(loadCacheFull(em, clo)));
+                    }
+                }
+            }
+
+            for (Future<?> fut : futs)
+                U.get(fut);
+
+            if (log.isDebugEnabled())
+                log.debug("Cache loaded from db: " + cacheName);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheLoaderException("Failed to load cache: " + cacheName, e.getCause());
+        }
+        finally {
+            U.shutdownNow(getClass(), pool, log);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V load(K key) throws CacheLoaderException {
+        assert key != null;
+
+        EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]");
+
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = connection();
+
+            stmt = conn.prepareStatement(em.loadQrySingle);
+
+            fillKeyParameters(stmt, em, key);
+
+            ResultSet rs = stmt.executeQuery();
+
+            if (rs.next())
+                return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+        }
+        catch (SQLException e) {
+            throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() +
+                ", key=" + key + "]", e);
+        }
+        finally {
+            end(conn, stmt);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+        assert keys != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            String cacheName = session().cacheName();
+
+            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size());
+
+            Map<K, V> res = new HashMap<>();
+
+            for (K key : keys) {
+                Object keyTypeId = typeIdForObject(key);
+
+                EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+
+                LoadWorker<K, V> worker = workers.get(keyTypeId);
+
+                if (worker == null)
+                    workers.put(keyTypeId, worker = new LoadWorker<>(conn, em));
+
+                worker.keys.add(key);
+
+                if (worker.keys.size() == em.maxKeysPerStmt)
+                    res.putAll(workers.remove(keyTypeId).call());
+            }
+
+            for (LoadWorker<K, V> worker : workers.values())
+                res.putAll(worker.call());
+
+            return res;
+        }
+        catch (Exception e) {
+            throw new CacheWriterException("Failed to load entries from database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /**
+     * @param insStmt Insert statement.
+     * @param updStmt Update statement.
+     * @param em Entry mapping.
+     * @param entry Cache entry.
+     * @throws CacheWriterException If failed to update record in database.
+     */
+    private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt,
+        EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+        try {
+            CacheWriterException we = null;
+
+            for (int attempt = 0; attempt < maxWrtAttempts; attempt++) {
+                int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue());
+
+                fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
+
+                if (updStmt.executeUpdate() == 0) {
+                    paramIdx = fillKeyParameters(insStmt, em, entry.getKey());
+
+                    fillValueParameters(insStmt, paramIdx, em, entry.getValue());
+
+                    try {
+                        insStmt.executeUpdate();
+
+                        if (attempt > 0)
+                            U.warn(log, "Entry was inserted in database on second try [table=" + em.fullTableName() +
+                                ", entry=" + entry + "]");
+                    }
+                    catch (SQLException e) {
+                        String sqlState = e.getSQLState();
+
+                        SQLException nested = e.getNextException();
+
+                        while (sqlState == null && nested != null) {
+                            sqlState = nested.getSQLState();
+
+                            nested = nested.getNextException();
+                        }
+
+                        // The error with code 23505 or 23000 is thrown when trying to insert a row that
+                        // would violate a unique index or primary key.
+                        if ("23505".equals(sqlState) || "23000".equals(sqlState)) {
+                            if (we == null)
+                                we = new CacheWriterException("Failed insert entry in database, violate a unique" +
+                                    " index or primary key [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+                            we.addSuppressed(e);
+
+                            U.warn(log, "Failed insert entry in database, violate a unique index or primary key" +
+                                " [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+                            continue;
+                        }
+
+                        throw new CacheWriterException("Failed insert entry in database [table=" + em.fullTableName() +
+                            ", entry=" + entry, e);
+                    }
+                }
+
+                if (attempt > 0)
+                    U.warn(log, "Entry was updated in database on second try [table=" + em.fullTableName() +
+                        ", entry=" + entry + "]");
+
+                return;
+            }
+
+            throw we;
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed update entry in database [table=" + em.fullTableName() +
+                ", entry=" + entry + "]", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+        assert entry != null;
+
+        K key = entry.getKey();
+
+        EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            if (dialect.hasMerge()) {
+                PreparedStatement stmt = null;
+
+                try {
+                    stmt = conn.prepareStatement(em.mergeQry);
+
+                    int i = fillKeyParameters(stmt, em, key);
+
+                    fillValueParameters(stmt, i, em, entry.getValue());
+
+                    int updCnt = stmt.executeUpdate();
+
+                    if (updCnt != 1)
+                        U.warn(log, "Unexpected number of updated entries [table=" + em.fullTableName() +
+                            ", entry=" + entry + "expected=1, actual=" + updCnt + "]");
+                }
+                finally {
+                    U.closeQuiet(stmt);
+                }
+            }
+            else {
+                PreparedStatement insStmt = null;
+
+                PreparedStatement updStmt = null;
+
+                try {
+                    insStmt = conn.prepareStatement(em.insQry);
+
+                    updStmt = conn.prepareStatement(em.updQry);
+
+                    writeUpsert(insStmt, updStmt, em, entry);
+                }
+                finally {
+                    U.closeQuiet(insStmt);
+
+                    U.closeQuiet(updStmt);
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to write entry to database [table=" + em.fullTableName() +
+                ", entry=" + entry + "]", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries)
+        throws CacheWriterException {
+        assert entries != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            String cacheName = session().cacheName();
+
+            Object currKeyTypeId = null;
+
+            if (dialect.hasMerge()) {
+                PreparedStatement mergeStmt = null;
+
+                try {
+                    EntryMapping em = null;
+
+                    LazyValue<Object[]> lazyEntries = new LazyValue<Object[]>() {
+                        @Override public Object[] create() {
+                            return entries.toArray();
+                        }
+                    };
+
+                    int fromIdx = 0, prepared = 0;
+
+                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                        K key = entry.getKey();
+
+                        Object keyTypeId = typeIdForObject(key);
+
+                        em = entryMapping(cacheName, keyTypeId, key);
+
+                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+                            if (mergeStmt != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Write entries to db [cache name=" + cacheName +
+                                        ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                                executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+                                U.closeQuiet(mergeStmt);
+                            }
+
+                            mergeStmt = conn.prepareStatement(em.mergeQry);
+
+                            currKeyTypeId = keyTypeId;
+
+                            fromIdx += prepared;
+
+                            prepared = 0;
+                        }
+
+                        int i = fillKeyParameters(mergeStmt, em, key);
+
+                        fillValueParameters(mergeStmt, i, em, entry.getValue());
+
+                        mergeStmt.addBatch();
+
+                        if (++prepared % batchSz == 0) {
+                            if (log.isDebugEnabled())
+                                log.debug("Write entries to db [cache name=" + cacheName +
+                                    ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                            executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+                            fromIdx += prepared;
+
+                            prepared = 0;
+                        }
+                    }
+
+                    if (mergeStmt != null && prepared % batchSz != 0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Write entries to db [cache name=" + cacheName +
+                                ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                        executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+                    }
+                }
+                finally {
+                    U.closeQuiet(mergeStmt);
+                }
+            }
+            else {
+                log.debug("Write entries to db one by one using update and insert statements [cache name=" +
+                    cacheName + ", count=" + entries.size() + "]");
+
+                PreparedStatement insStmt = null;
+
+                PreparedStatement updStmt = null;
+
+                try {
+                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                        K key = entry.getKey();
+
+                        Object keyTypeId = typeIdForObject(key);
+
+                        EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+
+                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+                            U.closeQuiet(insStmt);
+
+                            insStmt = conn.prepareStatement(em.insQry);
+
+                            U.closeQuiet(updStmt);
+
+                            updStmt = conn.prepareStatement(em.updQry);
+
+                            currKeyTypeId = keyTypeId;
+                        }
+
+                        writeUpsert(insStmt, updStmt, em, entry);
+                    }
+                }
+                finally {
+                    U.closeQuiet(insStmt);
+
+                    U.closeQuiet(updStmt);
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to write entries in database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(Object key) throws CacheWriterException {
+        assert key != null;
+
+        EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]");
+
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = connection();
+
+            stmt = conn.prepareStatement(em.remQry);
+
+            fillKeyParameters(stmt, em, key);
+
+            int delCnt = stmt.executeUpdate();
+
+            if (delCnt != 1)
+                U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key +
+                    ", expected=1, actual=" + delCnt + "]");
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() +
+                ", key=" + key + "]", e);
+        }
+        finally {
+            end(conn, stmt);
+        }
+    }
+
+    /**
+     * @param em Entry mapping.
+     * @param stmt Statement.
+     * @param desc Statement description for error message.
+     * @param fromIdx Objects in batch start from index.
+     * @param prepared Expected objects in batch.
+     * @param lazyObjs All objects used in batch statement as array.
+     * @throws SQLException If failed to execute batch statement.
+     */
+    private void executeBatch(EntryMapping em, Statement stmt, String desc, int fromIdx, int prepared,
+        LazyValue<Object[]> lazyObjs) throws SQLException {
+        try {
+            int[] rowCounts = stmt.executeBatch();
+
+            int numOfRowCnt = rowCounts.length;
+
+            if (numOfRowCnt != prepared)
+                U.warn(log, "Unexpected number of updated rows [table=" + em.fullTableName() + ", expected=" + prepared +
+                    ", actual=" + numOfRowCnt + "]");
+
+            for (int i = 0; i < numOfRowCnt; i++) {
+                int cnt = rowCounts[i];
+
+                if (cnt != 1 && cnt != SUCCESS_NO_INFO) {
+                    Object[] objs = lazyObjs.value();
+
+                    U.warn(log, "Batch " + desc + " returned unexpected updated row count [table=" + em.fullTableName() +
+                        ", entry=" + objs[fromIdx + i] + ", expected=1, actual=" + cnt + "]");
+                }
+            }
+        }
+        catch (BatchUpdateException be) {
+            int[] rowCounts = be.getUpdateCounts();
+
+            for (int i = 0; i < rowCounts.length; i++) {
+                if (rowCounts[i] == EXECUTE_FAILED) {
+                    Object[] objs = lazyObjs.value();
+
+                    U.warn(log, "Batch " + desc + " failed on execution [table=" + em.fullTableName() +
+                        ", entry=" + objs[fromIdx + i] + "]");
+                }
+            }
+
+            throw be;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(final Collection<?> keys) throws CacheWriterException {
+        assert keys != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() {
+                @Override public Object[] create() {
+                    return keys.toArray();
+                }
+            };
+
+            String cacheName = session().cacheName();
+
+            Object currKeyTypeId = null;
+
+            EntryMapping em = null;
+
+            PreparedStatement delStmt = null;
+
+            int fromIdx = 0, prepared = 0;
+
+            for (Object key : keys) {
+                Object keyTypeId = typeIdForObject(key);
+
+                em = entryMapping(cacheName, keyTypeId, key);
+
+                if (delStmt == null) {
+                    delStmt = conn.prepareStatement(em.remQry);
+
+                    currKeyTypeId = keyTypeId;
+                }
+
+                if (!currKeyTypeId.equals(keyTypeId)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Delete entries from db [cache name=" + cacheName +
+                            ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+
+                    fromIdx += prepared;
+
+                    prepared = 0;
+
+                    currKeyTypeId = keyTypeId;
+                }
+
+                fillKeyParameters(delStmt, em, key);
+
+                delStmt.addBatch();
+
+                if (++prepared % batchSz == 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Delete entries from db [cache name=" + cacheName +
+                            ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+
+                    fromIdx += prepared;
+
+                    prepared = 0;
+                }
+            }
+
+            if (delStmt != null && prepared % batchSz != 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Delete entries from db [cache name=" + cacheName +
+                        ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+                executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to remove values from database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /**
+     * Sets the value of the designated parameter using the given object.
+     *
+     * @param stmt Prepare statement.
+     * @param i Index for parameters.
+     * @param field Field descriptor.
+     * @param fieldVal Field value.
+     * @throws CacheException If failed to set statement parameter.
+     */
+    private void fillParameter(PreparedStatement stmt, int i, CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal)
+        throws CacheException {
+        try {
+            if (fieldVal != null) {
+                if (field.getJavaFieldType() == UUID.class) {
+                    switch (field.getDatabaseFieldType()) {
+                        case Types.BINARY:
+                            fieldVal = U.uuidToBytes((UUID)fieldVal);
+
+                            break;
+                        case Types.CHAR:
+                        case Types.VARCHAR:
+                            fieldVal = fieldVal.toString();
+
+                            break;
+                    }
+                }
+
+                stmt.setObject(i, fieldVal);
+            }
+            else
+                stmt.setNull(i, field.getDatabaseFieldType());
+        }
+        catch (SQLException e) {
+            throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseFieldName(), e);
+        }
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param idx Start index for parameters.
+     * @param em Entry mapping.
+     * @param key Key object.
+     * @return Next index for parameters.
+     * @throws CacheException If failed to set statement parameters.
+     */
+    private int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em,
+        Object key) throws CacheException {
+        for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) {
+            Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key);
+
+            fillParameter(stmt, idx++, field, fieldVal);
+        }
+
+        return idx;
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param m Type mapping description.
+     * @param key Key object.
+     * @return Next index for parameters.
+     * @throws CacheException If failed to set statement parameters.
+     */
+    private int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException {
+        return fillKeyParameters(stmt, 1, m, key);
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param idx Start index for parameters.
+     * @param em Type mapping description.
+     * @param val Value object.
+     * @return Next index for parameters.
+     * @throws CacheException If failed to set statement parameters.
+     */
+    private int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val)
+        throws CacheWriterException {
+        for (CacheJdbcPojoStoreTypeField field : em.uniqValFields) {
+            Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaFieldName(), val);
+
+            fillParameter(stmt, idx++, field, fieldVal);
+        }
+
+        return idx;
+    }
+
+    /**
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /**
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Get database dialect.
+     *
+     * @return Database dialect.
+     */
+    public JdbcDialect getDialect() {
+        return dialect;
+    }
+
+    /**
+     * Set database dialect.
+     *
+     * @param dialect Database dialect.
+     */
+    public void setDialect(JdbcDialect dialect) {
+        this.dialect = dialect;
+    }
+
+    /**
+     * Get Max workers thread count. These threads are responsible for execute query.
+     *
+     * @return Max workers thread count.
+     */
+    public int getMaximumPoolSize() {
+        return maxPoolSz;
+    }
+
+    /**
+     * Gets maximum number of write attempts in case of database error.
+     *
+     * @return Maximum number of write attempts.
+     */
+    public int getMaximumWriteAttempts() {
+        return maxWrtAttempts;
+    }
+
+    /**
+     * Sets maximum number of write attempts in case of database error.
+     *
+     * @param maxWrtAttempts Number of write attempts.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStore<K, V> setMaximumWriteAttempts(int maxWrtAttempts) {
+        this.maxWrtAttempts = maxWrtAttempts;
+
+        return this;
+    }
+
+    /**
+     * Gets types known by store.
+     *
+     * @return Types known by store.
+     */
+    public CacheJdbcPojoStoreType[] getTypes() {
+        return types;
+    }
+
+    /**
+     * Sets store configurations.
+     *
+     * @param types Store should process.
+     * @return {@code This} for chaining.
+     */
+    public CacheJdbcPojoStore<K, V> setTypes(CacheJdbcPojoStoreType... types) {
+        this.types = types;
+
+        return this;
+    }
+
+    /**
+     * Set Max workers thread count. These threads are responsible for execute query.
+     *
+     * @param maxPoolSz Max workers thread count.
+     */
+    public void setMaximumPoolSize(int maxPoolSz) {
+        this.maxPoolSz = maxPoolSz;
+    }
+
+    /**
+     * Get maximum batch size for delete and delete operations.
+     *
+     * @return Maximum batch size.
+     */
+    public int getBatchSize() {
+        return batchSz;
+    }
+
+    /**
+     * Set maximum batch size for write and delete operations.
+     *
+     * @param batchSz Maximum batch size.
+     */
+    public void setBatchSize(int batchSz) {
+        this.batchSz = batchSz;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @return If {@code 0} then load sequentially.
+     */
+    public int getParallelLoadCacheMinimumThreshold() {
+        return parallelLoadCacheMinThreshold;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially.
+     */
+    public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) {
+        this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+    }
+
+    /**
+     * @return Ignite instance.
+     */
+    private Ignite ignite() {
+        return ignite;
+    }
+
+    /**
+     * @return Store session.
+     */
+    private CacheStoreSession session() {
+        return ses;
+    }
+
     /**
      * POJO methods cache.
      */
-    protected static class PojoMethodsCache {
+    private static class PojoMethodsCache {
         /** POJO class. */
-        protected final Class<?> cls;
+        private final Class<?> cls;
 
         /** Constructor for POJO object. */
         private Constructor ctor;
@@ -67,10 +1845,9 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
          *
          * @param clsName Class name.
          * @param fields Fields.
-         *
          * @throws CacheException If failed to construct type cache.
          */
-        public PojoMethodsCache(String clsName, Collection<CacheTypeFieldMetadata> fields) throws CacheException {
+        public PojoMethodsCache(String clsName, CacheJdbcPojoStoreTypeField[] fields) throws CacheException {
             try {
                 cls = Class.forName(clsName);
 
@@ -89,32 +1866,32 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
                 throw new CacheException("Failed to find default constructor for class: " + clsName, e);
             }
 
-            setters = U.newHashMap(fields.size());
+            setters = U.newHashMap(fields.length);
 
-            getters = U.newHashMap(fields.size());
+            getters = U.newHashMap(fields.length);
 
-            for (CacheTypeFieldMetadata field : fields) {
-                String prop = capitalFirst(field.getJavaName());
+            for (CacheJdbcPojoStoreTypeField field : fields) {
+                String prop = capitalFirst(field.getJavaFieldName());
 
                 try {
-                    getters.put(field.getJavaName(), cls.getMethod("get" + prop));
+                    getters.put(field.getJavaFieldName(), cls.getMethod("get" + prop));
                 }
                 catch (NoSuchMethodException ignored) {
                     try {
-                        getters.put(field.getJavaName(), cls.getMethod("is" + prop));
+                        getters.put(field.getJavaFieldName(), cls.getMethod("is" + prop));
                     }
                     catch (NoSuchMethodException e) {
                         throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName +
-                            ", prop=" + field.getJavaName() + "]", e);
+                            ", prop=" + field.getJavaFieldName() + "]", e);
                     }
                 }
 
                 try {
-                    setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
+                    setters.put(field.getJavaFieldName(), cls.getMethod("set" + prop, field.getJavaFieldType()));
                 }
                 catch (NoSuchMethodException e) {
                     throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName +
-                        ", prop=" + field.getJavaName() + "]", e);
+                        ", prop=" + field.getJavaFieldName() + "]", e);
                 }
             }
         }
@@ -131,116 +1908,366 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
         }
     }
 
-    /** Methods cache. */
-    protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = Collections.emptyMap();
+    /**
+     * Entry mapping description.
+     */
+    private static class EntryMapping {
+        /** Cache name. */
+        private final String cacheName;
 
-    /** {@inheritDoc} */
-    @Override protected void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types)
-        throws CacheException {
-        Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2);
+        /** Database dialect. */
+        private final JdbcDialect dialect;
 
-        for (CacheTypeMetadata type : types) {
-            String keyType = type.getKeyType();
-            typeMethods.put(keyType, new PojoMethodsCache(keyType, type.getKeyFields()));
+        /** Select border for range queries. */
+        private final String loadCacheSelRangeQry;
 
-            String valType = type.getValueType();
-            typeMethods.put(valType, new PojoMethodsCache(valType, type.getValueFields()));
-        }
+        /** Select all items query. */
+        private final String loadCacheQry;
 
-        Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new HashMap<>(mtdsCache);
+        /** Select item query. */
+        private final String loadQrySingle;
 
-        newMtdsCache.put(cacheName, typeMethods);
+        /** Select items query. */
+        private final String loadQry;
 
-        mtdsCache = newMtdsCache;
-    }
+        /** Merge item(s) query. */
+        private final String mergeQry;
 
-    /** {@inheritDoc} */
-    @Override protected <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata> fields,
-        Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException {
-        PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
+        /** Update item query. */
+        private final String insQry;
 
-        if (mc == null)
-            throw new CacheLoaderException("Failed to find cache type metadata for type: " + typeName);
+        /** Update item query. */
+        private final String updQry;
 
-        try {
-            if (mc.simple) {
-                CacheTypeFieldMetadata field = F.first(fields);
+        /** Remove item(s) query. */
+        private final String remQry;
 
-                return (R)getColumnValue(rs, loadColIdxs.get(field.getDatabaseName()), mc.cls);
-            }
+        /** Max key count for load query per statement. */
+        private final int maxKeysPerStmt;
 
-            Object obj = mc.ctor.newInstance();
+        /** Database key columns. */
+        private final Collection<String> keyCols;
 
-            for (CacheTypeFieldMetadata field : fields) {
-                String fldJavaName = field.getJavaName();
+        /** Database unique value columns. */
+        private final Collection<String> cols;
 
-                Method setter = mc.setters.get(fldJavaName);
+        /** Select query columns index. */
+        private final Map<String, Integer> loadColIdxs;
 
-                if (setter == null)
-                    throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
-                        ", prop=" + fldJavaName + "]");
+        /** Unique value fields. */
+        private final Collection<CacheJdbcPojoStoreTypeField> uniqValFields;
 
-                String fldDbName = field.getDatabaseName();
+        /** Type metadata. */
+        private final CacheJdbcPojoStoreType typeMeta;
 
-                Integer colIdx = loadColIdxs.get(fldDbName);
+        /** Full table name. */
+        private final String fullTblName;
 
-                try {
-                    setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+        /**
+         * @param cacheName Cache name.
+         * @param dialect JDBC dialect.
+         * @param typeMeta Type metadata.
+         */
+        public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, CacheJdbcPojoStoreType typeMeta) {
+            this.cacheName = cacheName;
+
+            this.dialect = dialect;
+
+            this.typeMeta = typeMeta;
+
+            CacheJdbcPojoStoreTypeField[] keyFields = typeMeta.getKeyFields();
+
+            CacheJdbcPojoStoreTypeField[] valFields = typeMeta.getValueFields();
+
+            keyCols = databaseColumns(F.asList(keyFields));
+
+            uniqValFields = F.view(F.asList(valFields), new IgnitePredicate<CacheJdbcPojoStoreTypeField>() {
+                @Override public boolean apply(CacheJdbcPojoStoreTypeField col) {
+                    return !keyCols.contains(col.getDatabaseFieldName());
                 }
-                catch (Exception e) {
-                    throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
-                        ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+            });
+
+            String schema = typeMeta.getDatabaseSchema();
+
+            String tblName = typeMeta.getDatabaseTable();
+
+            fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName;
+
+            Collection<String> uniqValCols = databaseColumns(uniqValFields);
+
+            cols = F.concat(false, keyCols, uniqValCols);
+
+            loadColIdxs = U.newHashMap(cols.size());
+
+            int idx = 1;
+
+            for (String col : cols)
+                loadColIdxs.put(col, idx++);
+
+            loadCacheQry = dialect.loadCacheQuery(fullTblName, cols);
+
+            loadCacheSelRangeQry = dialect.loadCacheSelectRangeQuery(fullTblName, keyCols);
+
+            loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1);
+
+            maxKeysPerStmt = dialect.getMaxParameterCount() / keyCols.size();
+
+            loadQry = dialect.loadQuery(fullTblName, keyCols, cols, maxKeysPerStmt);
+
+            insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols);
+
+            updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols);
+
+            mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols);
+
+            remQry = dialect.removeQuery(fullTblName, keyCols);
+        }
+
+        /**
+         * Extract database column names from {@link CacheJdbcPojoStoreTypeField}.
+         *
+         * @param dsc collection of {@link CacheJdbcPojoStoreTypeField}.
+         * @return Collection with database column names.
+         */
+        private static Collection<String> databaseColumns(Collection<CacheJdbcPojoStoreTypeField> dsc) {
+            return F.transform(dsc, new C1<CacheJdbcPojoStoreTypeField, String>() {
+                /** {@inheritDoc} */
+                @Override public String apply(CacheJdbcPojoStoreTypeField col) {
+                    return col.getDatabaseFieldName();
                 }
-            }
+            });
+        }
+
+        /**
+         * Construct query for select values with key count less or equal {@code maxKeysPerStmt}
+         *
+         * @param keyCnt Key count.
+         * @return Load query statement text.
+         */
+        private String loadQuery(int keyCnt) {
+            assert keyCnt <= maxKeysPerStmt;
+
+            if (keyCnt == maxKeysPerStmt)
+                return loadQry;
 
-            return (R)obj;
+            if (keyCnt == 1)
+                return loadQrySingle;
+
+            return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt);
         }
-        catch (SQLException e) {
-            throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
+
+        /**
+         * Construct query for select values in range.
+         *
+         * @param appendLowerBound Need add lower bound for range.
+         * @param appendUpperBound Need add upper bound for range.
+         * @return Query with range.
+         */
+        private String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) {
+            return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound);
         }
-        catch (Exception e) {
-            throw new CacheLoaderException("Failed to construct instance of class: " + typeName, e);
+
+        /**
+         * @return Key type.
+         */
+        protected String keyType() {
+            return typeMeta.getKeyType();
         }
-    }
 
-    /** {@inheritDoc} */
-    @Nullable @Override protected Object extractParameter(String cacheName, String typeName, String fieldName,
-        Object obj)
-        throws CacheException {
-        try {
-            PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
+        /**
+         * @return Value type.
+         */
+        protected String valueType() {
+            return typeMeta.getValueType();
+        }
 
-            if (mc == null)
-                throw new CacheException("Failed to find cache type metadata for type: " + typeName);
+        /**
+         * Gets key columns.
+         *
+         * @return Key columns.
+         */
+        protected CacheJdbcPojoStoreTypeField[] keyColumns() {
+            return typeMeta.getKeyFields();
+        }
 
-            if (mc.simple)
-                return obj;
+        /**
+         * Gets value columns.
+         *
+         * @return Value columns.
+         */
+        protected CacheJdbcPojoStoreTypeField[] valueColumns() {
+            return typeMeta.getValueFields();
+        }
 
-            Method getter = mc.getters.get(fieldName);
+        /**
+         * Get full table name.
+         *
+         * @return &lt;schema&gt;.&lt;table name&gt
+         */
+        protected String fullTableName() {
+            return fullTblName;
+        }
+    }
 
-            if (getter == null)
-                throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
-                    ", prop=" + fieldName + "]");
+    /**
+     * Worker for load cache using custom user query.
+     *
+     * @param <K1> Key type.
+     * @param <V1> Value type.
+     */
+    private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> {
+        /** Entry mapping description. */
+        private final EntryMapping em;
 
-            return getter.invoke(obj);
+        /** User query. */
+        private final String qry;
+
+        /** Closure for loaded values. */
+        private final IgniteBiInClosure<K1, V1> clo;
+
+        /**
+         * @param em Entry mapping description.
+         * @param qry User query.
+         * @param clo Closure for loaded values.
+         */
+        private LoadCacheCustomQueryWorker(EntryMapping em, String qry, IgniteBiInClosure<K1, V1> clo) {
+            this.em = em;
+            this.qry = qry;
+            this.clo = clo;
         }
-        catch (Exception e) {
-            throw new CacheException("Failed to read object of class: " + typeName, e);
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            if (log.isDebugEnabled())
+                log.debug("Load cache using custom query [cache name= " + em.cacheName +
+                    ", key type=" + em.keyType() + ", query=" + qry + "]");
+
+            Connection conn = null;
+
+            PreparedStatement stmt = null;
+
+            try {
+                conn = openConnection(true);
+
+                stmt = conn.prepareStatement(qry);
+
+                ResultSet rs = stmt.executeQuery();
+
+                ResultSetMetaData meta = rs.getMetaData();
+
+                Map<String, Integer> colIdxs = U.newHashMap(meta.getColumnCount());
+
+                for (int i = 1; i <= meta.getColumnCount(); i++)
+                    colIdxs.put(meta.getColumnLabel(i), i);
+
+                while (rs.next()) {
+                    K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs);
+                    V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs);
+
+                    clo.apply(key, val);
+                }
+
+                return null;
+            }
+            catch (SQLException e) {
+                throw new CacheLoaderException("Failed to execute custom query for load cache", e);
+            }
+            finally {
+                U.closeQuiet(stmt);
+
+                U.closeQuiet(conn);
+            }
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected Object keyTypeId(Object key) throws CacheException {
-        return key.getClass();
+    /**
+     * Lazy initialization of value.
+     *
+     * @param <T> Cached object type
+     */
+    private abstract static class LazyValue<T> {
+        /** Cached value. */
+        private T val;
+
+        /**
+         * @return Construct value.
+         */
+        protected abstract T create();
+
+        /**
+         * @return Value.
+         */
+        public T value() {
+            if (val == null)
+                val = create();
+
+            return val;
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override protected Object keyTypeId(String type) throws CacheException {
-        try {
-            return Class.forName(type);
+    /**
+     * Worker for load by keys.
+     *
+     * @param <K1> Key type.
+     * @param <V1> Value type.
+     */
+    private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
+        /** Connection. */
+        private final Connection conn;
+
+        /** Keys for load. */
+        private final Collection<K1> keys;
+
+        /** Entry mapping description. */
+        private final EntryMapping em;
+
+        /**
+         * @param conn Connection.
+         * @param em Entry mapping description.
+         */
+        private LoadWorker(Connection conn, EntryMapping em) {
+            this.conn = conn;
+            this.em = em;
+
+            keys = new ArrayList<>(em.maxKeysPerStmt);
         }
-        catch (ClassNotFoundException e) {
-            throw new CacheException("Failed to find class: " + type, e);
+
+        /** {@inheritDoc} */
+        @Override public Map<K1, V1> call() throws Exception {
+            if (log.isDebugEnabled())
+                log.debug("Load values from db [table= " + em.fullTableName() +
+                    ", key count=" + keys.size() + "]");
+
+            PreparedStatement stmt = null;
+
+            try {
+                stmt = conn.prepareStatement(em.loadQuery(keys.size()));
+
+                int idx = 1;
+
+                for (Object key : keys)
+                    for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) {
+                        Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key);
+
+                        fillParameter(stmt, idx++, field, fieldVal);
+                    }
+
+                ResultSet rs = stmt.executeQuery();
+
+                Map<K1, V1> entries = U.newHashMap(keys.size());
+
+                while (rs.next()) {
+                    K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
+                    V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+
+                    entries.put(key, val);
+                }
+
+                return entries;
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
         }
     }
-}
\ No newline at end of file
+}