You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/01/09 13:46:21 UTC

[ignite] branch master updated: IGNITE-5003 Fixed hanging parallel write&evict in CacheWriteBehindStore - Fixes #5664.

This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e9fe14  IGNITE-5003 Fixed hanging parallel write&evict in CacheWriteBehindStore - Fixes #5664.
7e9fe14 is described below

commit 7e9fe14a31d743b3d314b7d00e23d971767659fe
Author: Andrei Aleksandrov <ae...@gmail.com>
AuthorDate: Wed Jan 9 16:44:36 2019 +0300

    IGNITE-5003 Fixed hanging parallel write&evict in CacheWriteBehindStore - Fixes #5664.
    
    Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
 .../cache/store/GridCacheWriteBehindStore.java     | 149 ++++-
 ...JdbcPojoWriteBehindStoreWithCoalescingTest.java | 680 +++++++++++++++++++++
 .../store/jdbc/model/TestJdbcPojoDataSource.java   | 523 ++++++++++++++++
 .../jdbc/model/TestJdbcPojoDataSourceFactory.java  | 106 ++++
 .../TestJdbcPojoStoreFactoryWithHangWriteAll.java  | 136 +++++
 .../ignite/cache/store/jdbc/model/TestPojo.java    | 117 ++++
 .../store/GridCacheWriteBehindStoreSelfTest.java   |   4 +
 .../ignite/testsuites/IgniteCacheTestSuite.java    |   2 +
 8 files changed, 1692 insertions(+), 25 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index fdf3649..c64dbc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -439,10 +439,24 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 val.readLock().lock();
 
                 try {
-                    if (val.operation() == StoreOperation.PUT)
-                        loaded.put(key, val.entry().getValue());
+                    StoreOperation op;
+
+                    V value;
+
+                    if (writeCoalescing && val.nextOperation() != null) {
+                        op = val.nextOperation();
+
+                        value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+                    } else {
+                        op = val.operation();
+
+                        value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+                    }
+
+                    if (op == StoreOperation.PUT)
+                        loaded.put(key, value);
                     else
-                        assert val.operation() == StoreOperation.RMV : val.operation();
+                        assert op == StoreOperation.RMV : op;
                 }
                 finally {
                     val.readLock().unlock();
@@ -484,9 +498,23 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             val.readLock().lock();
 
             try {
-                switch (val.operation()) {
+                StoreOperation op;
+
+                V value;
+
+                if (writeCoalescing && val.nextOperation() != null) {
+                    op = val.nextOperation();
+
+                    value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+                } else {
+                    op = val.operation();
+
+                    value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+                }
+
+                switch (op) {
                     case PUT:
-                        return val.entry().getValue();
+                        return value;
 
                     case RMV:
                         return null;
@@ -593,11 +621,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
             prev.writeLock().lock();
 
             try {
-                if (prev.status() == ValueStatus.PENDING) {
-                    // Flush process in progress, try again.
-                    prev.waitForFlush();
+                if (prev.status() == ValueStatus.PENDING || prev.status() == ValueStatus.PENDING_AND_UPDATED) {
+                    // Flush process in progress, save next value and update the status.
 
-                    continue;
+                    prev.setNext(newVal.val, newVal.storeOperation);
+
+                    prev.status(ValueStatus.PENDING_AND_UPDATED);
+
+                    break;
                 }
                 else if (prev.status() == ValueStatus.FLUSHED)
                     // This entry was deleted from map before we acquired the lock.
@@ -712,14 +743,22 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
 
         for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-            if (operation == null)
-                operation = e.getValue().operation();
+            StatefulValue<K, V> val = e.getValue();
 
-            assert operation == e.getValue().operation();
+            val.readLock().lock();
 
-            assert e.getValue().status() == ValueStatus.PENDING;
+            try {
+                if (operation == null)
+                    operation = val.operation();
+
+                assert operation == val.operation();
 
-            batch.put(e.getKey(), e.getValue().entry());
+                assert val.status() == ValueStatus.PENDING || val.status() == ValueStatus.PENDING_AND_UPDATED;
+
+                batch.put(e.getKey(), val.entry());
+            } finally {
+                val.readLock().unlock();
+            }
         }
 
         boolean result = updateStore(operation, batch, initSes, flusher);
@@ -731,17 +770,26 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 val.writeLock().lock();
 
                 try {
-                    val.status(ValueStatus.FLUSHED);
-
                     if (writeCoalescing) {
-                        StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+                        if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+                            val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
+
+                            val.setNext(null, null);
+                        }
+                        else {
+                            val.status(ValueStatus.FLUSHED);
 
-                        // Additional check to ensure consistency.
-                        assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+                            StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+
+                            // Additional check to ensure consistency.
+                            assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+                        }
 
                         val.signalFlushed();
                     }
                     else {
+                        val.status(ValueStatus.FLUSHED);
+
                         Flusher f = flusher(e.getKey());
 
                         // Can remove using equal because if map contains another similar value it has different state.
@@ -761,9 +809,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                 val.writeLock().lock();
 
                 try {
-                    val.status(ValueStatus.RETRY);
+                    if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+                        val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
 
-                    retryEntriesCnt.incrementAndGet();
+                        val.setNext(null, null);
+                    }
+                    else {
+                        val.status(ValueStatus.RETRY);
+
+                        retryEntriesCnt.incrementAndGet();
+                    }
 
                     val.signalFlushed();
                 }
@@ -1116,13 +1171,17 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
 
                     switch (addRes) {
                         case NEW_BATCH:
+                            // No need to test first value in batch
+                            val.status(ValueStatus.PENDING);
+
+                            val.writeLock().unlock();
+
                             applyBatch(pending, true, null);
 
                             pending = U.newLinkedHashMap(batchSize);
 
-                            // No need to test first value in batch
-                            val.status(ValueStatus.PENDING);
                             pending.put(e.getKey(), val);
+
                             prevOperation = val.operation();
 
                             break;
@@ -1137,7 +1196,8 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
                     }
                 }
                 finally {
-                    val.writeLock().unlock();
+                    if (val.writeLock().isHeldByCurrentThread())
+                        val.writeLock().unlock();
                 }
             }
 
@@ -1307,6 +1367,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         /** Value is captured by flusher and store operation is performed at the moment. */
         PENDING,
 
+        /**
+         * Value is captured by flusher and store operation is performed at the moment.
+         * New update for the key was stored and waiting for previous store operation.
+         */
+        PENDING_AND_UPDATED,
+
         /** Store operation has failed and it will be re-tried at the next flush. */
         RETRY,
 
@@ -1335,7 +1401,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
      * @return {@code true} if status indicates any pending or complete store update operation.
      */
     private boolean acquired(ValueStatus status) {
-        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
+        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED || status == ValueStatus.PENDING_AND_UPDATED;
     }
 
     /**
@@ -1352,9 +1418,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         @GridToStringInclude(sensitive = true)
         private Entry<? extends K, ? extends V> val;
 
+        /** Next value that waiting for a previous store operation. */
+        @GridToStringInclude(sensitive = true)
+        private Entry<? extends K, ? extends V> nextVal;
+
         /** Store operation. */
         private StoreOperation storeOperation;
 
+        /** Next store operation. */
+        private StoreOperation nextStoreOperation;
+
         /** Value status. */
         private ValueStatus valStatus;
 
@@ -1376,6 +1449,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         /**
+         * @return Next stored value.
+         */
+        private Entry<? extends K, ? extends V> nextEntry() {
+            return nextVal;
+        }
+
+        /**
+         * @return Store operation.
+         */
+        private StoreOperation nextOperation() {
+            return nextStoreOperation;
+        }
+
+        /**
          * @return Stored value.
          */
         private Entry<? extends K, ? extends V> entry() {
@@ -1421,6 +1508,18 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         }
 
         /**
+         * Added next value that waiting for a previous store operation.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         */
+        private void setNext(@Nullable Entry<? extends K, ? extends V> val,
+            StoreOperation storeOperation) {
+            this.nextVal = val;
+            this.nextStoreOperation = storeOperation;
+        }
+
+        /**
          * Awaits a signal on flush condition.
          *
          * @throws IgniteInterruptedCheckedException If thread was interrupted.
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
new file mode 100644
index 0000000..cbcc22f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoDataSourceFactory;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoStoreFactoryWithHangWriteAll;
+import org.apache.ignite.cache.store.jdbc.model.TestPojo;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.configuration.DataPageEvictionMode.RANDOM_LRU;
+import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+
+/**
+ * Tests for {@link CacheJdbcPojoStore}.
+ */
+@RunWith(JUnit4.class)
+public class CacheJdbcPojoWriteBehindStoreWithCoalescingTest extends GridCommonAbstractTest {
+    /** */
+    private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1";
+
+    /** */
+    private boolean isHangOnWriteAll = false;
+
+    /** */
+    private boolean isSmallRegion = false;
+
+    /** */
+    private final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 20 * 60 * 1000; //20 min
+    }
+
+    /** */
+    public DataStorageConfiguration getDataStorageConfiguration() {
+        DataStorageConfiguration memCfg = new DataStorageConfiguration();
+
+        DataRegionConfiguration plc = new DataRegionConfiguration();
+
+        plc.setName("Default_Region");
+
+        plc.setPageEvictionMode(RANDOM_LRU);
+
+        if (isSmallRegion)
+            plc.setMaxSize(128L * 1024 * 1024); // 128 MB
+        else
+            plc.setMaxSize(1L * 1024 * 1024 * 1024); // 1GB
+
+        memCfg.setDefaultDataRegionConfiguration(plc);
+
+        memCfg.setWalMode(LOG_ONLY);
+
+        return memCfg;
+    }
+
+    /** */
+    public TestJdbcPojoDataSourceFactory getDataSourceFactory() {
+        TestJdbcPojoDataSourceFactory testJdbcPojoDataSourceFactory = new TestJdbcPojoDataSourceFactory();
+
+        testJdbcPojoDataSourceFactory.setURL("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+        testJdbcPojoDataSourceFactory.setUserName("sa");
+
+        testJdbcPojoDataSourceFactory.setPassword("");
+
+        return testJdbcPojoDataSourceFactory;
+    }
+
+    /** */
+    public JdbcType getJdbcType() {
+        JdbcType type = new JdbcType();
+
+        type.setCacheName("TEST_CACHE");
+
+        type.setKeyType(Integer.class);
+
+        type.setValueType(TestPojo.class);
+
+        type.setDatabaseSchema("PUBLIC");
+
+        type.setDatabaseTable("TEST_CACHE");
+
+        type.setKeyFields(new JdbcTypeField(java.sql.Types.INTEGER, "VALUE2", Integer.class, "value2"));
+
+        type.setValueFields(
+            new JdbcTypeField(java.sql.Types.VARCHAR, "VALUE1", String.class, "value1"),
+            new JdbcTypeField(java.sql.Types.DATE, "VALUE3", java.sql.Date.class, "value3")
+        );
+
+        return type;
+    }
+
+    /** */
+    public CacheJdbcPojoStoreFactory getStoreFactory() {
+        CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
+
+        storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+        storeFactory.setBatchSize(100);
+
+        storeFactory.setMaximumPoolSize(4);
+
+        storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+        storeFactory.setDialect(new H2Dialect());
+
+        storeFactory.setTypes(getJdbcType());
+
+        return storeFactory;
+    }
+
+    /** */
+    public CacheJdbcPojoStoreFactory getStoreFactoryWithHangWriteAll() {
+        TestJdbcPojoStoreFactoryWithHangWriteAll storeFactory = new TestJdbcPojoStoreFactoryWithHangWriteAll();
+
+        storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+        storeFactory.setBatchSize(100);
+
+        storeFactory.setMaximumPoolSize(4);
+
+        storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+        storeFactory.setDialect(new H2Dialect());
+
+        storeFactory.setTypes(getJdbcType());
+
+        return storeFactory;
+    }
+
+    /** */
+    public CacheConfiguration getCacheConfiguration() {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName("TEST_CACHE");
+
+        ccfg.setCacheMode(REPLICATED);
+
+        ccfg.setAtomicityMode(ATOMIC);
+
+        ccfg.setPartitionLossPolicy(READ_WRITE_SAFE);
+
+        ccfg.setReadThrough(true);
+
+        ccfg.setWriteThrough(true);
+
+        ccfg.setWriteBehindEnabled(true);
+
+        ccfg.setWriteBehindBatchSize(1000);
+
+        QueryEntity queryEntity = new QueryEntity();
+
+        queryEntity.setKeyType("java.lang.Integer");
+
+        queryEntity.setValueType("org.apache.ignite.cache.store.jdbc.model.TestPojo");
+
+        queryEntity.setTableName("TEST_CACHE");
+
+        queryEntity.setKeyFieldName("value3");
+
+        Set<String> keyFiles = new HashSet<>();
+
+        keyFiles.add("value3");
+
+        queryEntity.setKeyFields(keyFiles);
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+        fields.put("value1", "java.lang.String");
+
+        fields.put("value2", "java.lang.Integer");
+
+        fields.put("value3", "java.sql.Date");
+
+        queryEntity.setFields(fields);
+
+        Map<String, String> aliases = new HashMap<>();
+
+        aliases.put("value1", "VALUE1");
+
+        aliases.put("value2", "VALUE2");
+
+        aliases.put("value3", "VALUE3");
+
+        queryEntity.setAliases(aliases);
+
+        ArrayList<QueryEntity> queryEntities = new ArrayList<>();
+
+        queryEntities.add(queryEntity);
+
+        ccfg.setQueryEntities(queryEntities);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        //Data Storage
+
+        cfg.setDataStorageConfiguration(getDataStorageConfiguration());
+
+        //cache configuration
+
+        CacheConfiguration ccfg = getCacheConfiguration();
+
+        if (isHangOnWriteAll)
+            ccfg.setCacheStoreFactory(getStoreFactoryWithHangWriteAll());
+        else
+            ccfg.setCacheStoreFactory(getStoreFactory());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        //discovery
+
+        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
+
+        ArrayList<String> addrs = new ArrayList<>();
+
+        addrs.add("127.0.0.1:47500..47509");
+
+        tcpDiscoveryVmIpFinder.setAddresses(addrs);
+
+        tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
+
+        cfg.setDiscoverySpi(tcpDiscoverySpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        testFailed.set(false);
+
+        cleanPersistenceDir();
+
+        try {
+            Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+            Statement stmt = conn.createStatement();
+
+            stmt.executeUpdate("DROP TABLE IF EXISTS TEST_CACHE");
+
+            stmt.executeUpdate("CREATE TABLE TEST_CACHE (" +
+                " VALUE2 INTEGER PRIMARY KEY," +
+                " VALUE1 VARCHAR(50)," +
+                " VALUE3 DATE" +
+                ")"
+            );
+
+            conn.commit();
+
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        } catch (SQLException ex) {
+            fail(ex.getMessage());
+        }
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    public void checkCacheStore(IgniteCache<Integer, TestPojo> cache) {
+        try {
+            Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+            Statement stmt = conn.createStatement();
+
+            ResultSet rs = stmt.executeQuery(" SELECT * FROM TEST_CACHE");
+
+            int count = 0;
+
+            while (rs.next()) {
+                String value1 = rs.getString("VALUE1");
+
+                Integer value2 = rs.getInt("VALUE2");
+
+                java.sql.Date value3 = rs.getDate("VALUE3");
+
+                TestPojo pojo = cache.get(value2);
+
+                assertNotNull(pojo);
+
+                Calendar c1 = Calendar.getInstance();
+
+                c1.setTime(value3);
+
+                Calendar c2 = Calendar.getInstance();
+
+                c2.setTime(pojo.getValue3());
+
+                assertEquals(value1, pojo.getValue1());
+
+                assertEquals(value2, pojo.getValue2());
+
+                assertEquals(c1.get(Calendar.DAY_OF_YEAR), c2.get(Calendar.DAY_OF_YEAR));
+
+                assertEquals(c1.get(Calendar.YEAR), c2.get(Calendar.YEAR));
+
+                assertEquals(c1.get(Calendar.MONTH), c2.get(Calendar.MONTH));
+
+                count++;
+            }
+
+            assertEquals(count, cache.size());
+
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        } catch (SQLException ex) {
+            fail();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHangWriteAllWithCoalescing() throws Exception {
+        isHangOnWriteAll = true;
+
+        writeAllWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNormalWriteAllWithCoalescing() throws Exception {
+        isHangOnWriteAll = false;
+
+        writeAllWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReadWithCoalescingSmallRegionWithEviction() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = true;
+
+        readWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReadWithCoalescingNormalRegion() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = false;
+
+        readWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateAndReadTheSameKeyWithCoalescing() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = false;
+
+        updateAndReadWithCoalescingSameKey();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateAndReadTheSameKeyWithCoalescingHangWriteAll() throws Exception {
+        isHangOnWriteAll = true;
+
+        isSmallRegion = false;
+
+        updateAndReadWithCoalescingSameKey();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void updateAndReadWithCoalescingSameKey() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(5);
+
+        AtomicInteger t2Count = new AtomicInteger(5);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 200000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(1, next);
+
+                            TestPojo ret = cache.get(1);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 200000; i < 400000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(2, next);
+
+                            TestPojo ret = cache.get(2);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        TestErrorHandler handler = new TestErrorHandler();
+
+        t1.setUncaughtExceptionHandler(handler);
+
+        t2.setUncaughtExceptionHandler(handler);
+
+        t1.start();
+
+        t2.start();
+
+        t1.join();
+
+        t2.join();
+
+        assertFalse(testFailed.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void readWithCoalescing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(5);
+
+        AtomicInteger t2Count = new AtomicInteger(5);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 200000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(i, next);
+
+                            TestPojo ret = cache.get(i);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 200000; i < 400000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(i, next);
+
+                            TestPojo ret = cache.get(i);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        TestErrorHandler handler = new TestErrorHandler();
+
+        t1.setUncaughtExceptionHandler(handler);
+
+        t2.setUncaughtExceptionHandler(handler);
+
+        t1.start();
+
+        t2.start();
+
+        t1.join();
+
+        t2.join();
+
+        assertFalse(testFailed.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void writeAllWithCoalescing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(10);
+
+        AtomicInteger t2Count = new AtomicInteger(10);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 5000; i++)
+                            cache.put(i, new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 0; i < 5000; i++)
+                            cache.put(i, new TestPojo("UPDATE" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+                        try {
+                            U.sleep(500);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            e.printStackTrace();
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        t1.start();
+
+        t2.start();
+
+        //t1 should be completed before 10 seconds.
+        U.sleep(10_000);
+
+        assertEquals(0, t1Count.get());
+
+        t1.join();
+
+        t2.join();
+
+        assertEquals(0, t2Count.get());
+
+        //now wait for updates will be done on store size and check that the data set is the same
+        if (isHangOnWriteAll)
+            //max time -> 10000 updates that will be send by 1000 batches -> (10000 / 1000) * 10 = 100 seconds.
+            U.sleep(100_000);
+        else
+            U.sleep(10_000);
+
+        checkCacheStore(cache);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private class TestErrorHandler implements Thread.UncaughtExceptionHandler {
+        /** {@inheritDoc} */
+        @Override public void uncaughtException(Thread t, Throwable e) {
+            testFailed.set(true);
+        }
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
new file mode 100644
index 0000000..627b4a3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.PrintWriter;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource.
+ */
+public class TestJdbcPojoDataSource implements DataSource {
+    /** */
+    private final ThreadLocal<ConnectionHolder> holder = new ThreadLocal<ConnectionHolder>() {
+        @Override
+        protected ConnectionHolder initialValue() {
+            return new ConnectionHolder();
+        }
+    };
+
+    /** */
+    private volatile boolean perThreadMode = true;
+
+    /** */
+    private String url;
+
+    /** */
+    private String username;
+
+    /** */
+    private String password;
+
+    /** */
+    private long idleCheckTimeout = 30_000;
+
+    /** */
+    public void setDriverClassName(String driverClassName) {
+        try {
+            Class.forName(driverClassName);
+        }
+        catch (ClassNotFoundException e) {
+            throw new IllegalStateException("JDBC driver class not found: " + driverClassName, e);
+        }
+    }
+
+    /** */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    /** */
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    /** */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /** */
+    public void setIdleCheckTimeout(long idleCheckTimeout) {
+        this.idleCheckTimeout = idleCheckTimeout;
+    }
+
+    /** */
+    void switchPerThreadMode(boolean perThreadMode) {
+        this.perThreadMode = perThreadMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Connection getConnection() throws SQLException {
+        if (perThreadMode) {
+            ConnectionHolder holder = this.holder.get();
+
+            holder.initIfNeeded();
+
+            return holder;
+        }
+        else
+            return createConnection();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Connection getConnection(String username, String password) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setLoginTimeout(int seconds) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getLoginTimeout() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public PrintWriter getLogWriter() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setLogWriter(PrintWriter out) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    private Connection createConnection() throws SQLException {
+        return DriverManager.getConnection(url, username, password);
+    }
+
+    /** {@inheritDoc} */
+    private class ConnectionHolder implements Connection {
+        /** */
+        private Connection conn;
+
+        /** */
+        private long lastAccessTs = System.currentTimeMillis();
+
+        /** */
+        void initIfNeeded() throws SQLException {
+            if (conn == null || conn.isClosed())
+                conn = createConnection();
+            else if (System.currentTimeMillis() - lastAccessTs > idleCheckTimeout && !conn.isValid(5_000)) {
+                conn.close();
+
+                conn = createConnection();
+            }
+
+            lastAccessTs = System.currentTimeMillis();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void close() throws SQLException {
+            if (!perThreadMode)
+                conn.close();
+        }
+
+        // --- Delegation. ---
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isClosed() throws SQLException {
+            return conn.isClosed();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement() throws SQLException {
+            return conn.createStatement();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql) throws SQLException {
+            return conn.prepareStatement(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql) throws SQLException {
+            return conn.prepareCall(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String nativeSQL(String sql) throws SQLException {
+            return conn.nativeSQL(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setAutoCommit(boolean autoCommit) throws SQLException {
+            conn.setAutoCommit(autoCommit);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean getAutoCommit() throws SQLException {
+            return conn.getAutoCommit();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void commit() throws SQLException {
+            conn.commit();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void rollback() throws SQLException {
+            conn.rollback();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public DatabaseMetaData getMetaData() throws SQLException {
+            return conn.getMetaData();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setReadOnly(boolean readOnly) throws SQLException {
+            conn.setReadOnly(readOnly);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isReadOnly() throws SQLException {
+            return conn.isReadOnly();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setCatalog(String catalog) throws SQLException {
+            conn.setCatalog(catalog);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getCatalog() throws SQLException {
+            return conn.getCatalog();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setTransactionIsolation(int level) throws SQLException {
+            conn.setTransactionIsolation(level);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getTransactionIsolation() throws SQLException {
+            return conn.getTransactionIsolation();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SQLWarning getWarnings() throws SQLException {
+            return conn.getWarnings();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void clearWarnings() throws SQLException {
+            conn.clearWarnings();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.createStatement(resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Map<String, Class<?>> getTypeMap() throws SQLException {
+            return conn.getTypeMap();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+            conn.setTypeMap(map);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setHoldability(int holdability) throws SQLException {
+            conn.setHoldability(holdability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getHoldability() throws SQLException {
+            return conn.getHoldability();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Savepoint setSavepoint() throws SQLException {
+            return conn.setSavepoint();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Savepoint setSavepoint(String name) throws SQLException {
+            return conn.setSavepoint(name);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void rollback(Savepoint savepoint) throws SQLException {
+            conn.rollback(savepoint);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+            conn.releaseSavepoint(savepoint);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+            return conn.prepareStatement(sql, autoGeneratedKeys);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+            return conn.prepareStatement(sql, columnIndexes);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+            return conn.prepareStatement(sql, columnNames);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Clob createClob() throws SQLException {
+            return conn.createClob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Blob createBlob() throws SQLException {
+            return conn.createBlob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public NClob createNClob() throws SQLException {
+            return conn.createNClob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SQLXML createSQLXML() throws SQLException {
+            return conn.createSQLXML();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isValid(int timeout) throws SQLException {
+            return conn.isValid(timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setClientInfo(String name, String value) throws SQLClientInfoException {
+            conn.setClientInfo(name, value);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setClientInfo(Properties properties) throws SQLClientInfoException {
+            conn.setClientInfo(properties);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getClientInfo(String name) throws SQLException {
+            return conn.getClientInfo(name);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Properties getClientInfo() throws SQLException {
+            return conn.getClientInfo();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+            return conn.createArrayOf(typeName, elements);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+            return conn.createStruct(typeName, attributes);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setSchema(String schema) throws SQLException {
+            conn.setSchema(schema);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getSchema() throws SQLException {
+            return conn.getSchema();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void abort(Executor executor) throws SQLException {
+            conn.abort(executor);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+            conn.setNetworkTimeout(executor, milliseconds);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getNetworkTimeout() throws SQLException {
+            return conn.getNetworkTimeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return conn.unwrap(iface);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return conn.isWrapperFor(iface);
+        }
+    }
+}
+
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
new file mode 100644
index 0000000..340751f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.util.Objects;
+import javax.cache.configuration.Factory;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource factory.
+ */
+public class TestJdbcPojoDataSourceFactory implements Factory<DataSource> {
+    /** */
+    private String URL;
+
+    /** */
+    private String userName;
+
+    /** */
+    private String password;
+
+    /** {@inheritDoc} */
+    @Override public DataSource create() {
+        TestJdbcPojoDataSource ds = new TestJdbcPojoDataSource();
+
+        ds.setUrl("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+        ds.setUsername("sa");
+
+        ds.setPassword("");
+
+        return ds;
+    }
+
+    /** */
+    public String getURL() {
+        return URL;
+    }
+
+    /** */
+    public void setURL(String URL) {
+        this.URL = URL;
+    }
+
+    /** */
+    public String getUserName() {
+        return userName;
+    }
+
+    /** */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /** */
+    public String getPassword() {
+        return password;
+    }
+
+    /** */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TestJdbcPojoDataSourceFactory factory = (TestJdbcPojoDataSourceFactory)o;
+        return Objects.equals(URL, factory.URL) &&
+            Objects.equals(userName, factory.userName) &&
+            Objects.equals(password, factory.password);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+
+        return Objects.hash(URL, userName, password);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "TestJdbcPojoDataSourceFactory{" +
+            "URL='" + URL + '\'' +
+            ", userName='" + userName + '\'' +
+            ", password='" + password + '\'' +
+            '}';
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
new file mode 100644
index 0000000..ebe93f3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.sql.PreparedStatement;
+import java.util.Collection;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory;
+import org.apache.ignite.cache.store.jdbc.JdbcTypeField;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test JDBC POJO Store Factory With Hang WriteAll Method..
+ */
+public class TestJdbcPojoStoreFactoryWithHangWriteAll<K, V> extends CacheJdbcPojoStoreFactory<K, V> {
+    /** */
+    private static long count = 0;
+
+    /** {@inheritDoc} */
+    @Override public CacheJdbcPojoStore<K, V> create() {
+        CacheJdbcPojoStore<K, V> store = new TestJdbcPojoStoreWithHangWriteAll<>();
+
+        store.setBatchSize(getBatchSize());
+        store.setDialect(getDialect());
+        store.setMaximumPoolSize(getMaximumPoolSize());
+        store.setMaximumWriteAttempts(getMaximumWriteAttempts());
+        store.setParallelLoadCacheMinimumThreshold(getParallelLoadCacheMinimumThreshold());
+        store.setTypes(getTypes());
+        store.setHasher(getHasher());
+        store.setTransformer(getTransformer());
+        store.setSqlEscapeAll(isSqlEscapeAll());
+        store.setDataSource(getDataSourceFactory().create());
+
+        return store;
+    }
+
+    /** */
+    public static class TestJdbcPojoStoreWithHangWriteAll<K,V> extends CacheJdbcPojoStore<K,V> {
+        /** {@inheritDoc} */
+        @Override protected void fillParameter(PreparedStatement stmt, int idx, JdbcTypeField field, @Nullable Object fieldVal) throws CacheException {
+            try {
+                super.fillParameter(stmt, idx, field, fieldVal);
+            }
+            catch (Exception e) {
+                log.error("Failed to fill parameter [idx=" + idx + ", field=" + field + ", val=" + fieldVal + ']', e);
+
+                throw e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException {
+            DataSource ds = getDataSource();
+
+            try {
+                if (ds instanceof TestJdbcPojoDataSource)
+                    ((TestJdbcPojoDataSource)ds).switchPerThreadMode(false);
+
+                super.loadCache(clo, args);
+            }
+            finally {
+                if (ds instanceof TestJdbcPojoDataSource)
+                    ((TestJdbcPojoDataSource)ds).switchPerThreadMode(true);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            try {
+                super.delete(key);
+            }
+            catch (Exception e) {
+                log.error("Failed to delete entry from cache store: " + key, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+            try {
+                super.deleteAll(keys);
+            }
+            catch (Exception e) {
+                log.error("Failed to delete entries from cache store: " + keys, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+            try {
+                super.write(entry);
+            }
+            catch (Exception e) {
+                log.error("Failed to write entry to cache store: " + entry, e);
+            }
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
+            try {
+                super.writeAll(entries);
+
+                Thread.sleep(10000);
+
+                count += entries.size();
+
+                log.info("Count of load data: " + count);
+            }
+            catch (Exception e) {
+                log.error("Failed to write entries to cache store: " + entries, e);
+            }
+        }
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
new file mode 100644
index 0000000..0423629
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import java.sql.Date;
+
+/**
+ * Test JDBC POJO object.
+ */
+public class TestPojo implements Serializable {
+    /** */
+    @QuerySqlField
+    private String value1;
+
+    /** */
+    @QuerySqlField
+    private Integer value2;
+
+    /** */
+    @QuerySqlField
+    private Date value3;
+
+    /**
+     * Default constructor.
+     */
+    public TestPojo() {
+        // No-op.
+    }
+
+    /** */
+    public TestPojo(String value1, int value2, Date value3) {
+        this.value1 = value1;
+
+        this.value2 = value2;
+
+        this.value3 = value3;
+    }
+
+    /** */
+    public String getValue1() {
+        return value1;
+    }
+
+    /** */
+    public void setValue1(String value1) {
+        this.value1 = value1;
+    }
+
+    /** */
+    public Integer getValue2() {
+        return value2;
+    }
+
+    /** */
+    public void setValue2(Integer value2) {
+        this.value2 = value2;
+    }
+
+    /** */
+    public Date getValue3() {
+        return value3;
+    }
+
+    /** */
+    public void setValue3(Date value3) {
+        this.value3 = value3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TestPojo pogo = (TestPojo)o;
+
+        return Objects.equals(value1, pogo.value1) &&
+            Objects.equals(value2, pogo.value2) &&
+            Objects.equals(value3, pogo.value3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+
+        return Objects.hash(value1, value2, value3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "TestPojo{" +
+            "value1='" + value1 + '\'' +
+            ", value2=" + value2 +
+            ", value3=" + value3 +
+            '}';
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index b242ac9..46adbde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.store;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -168,12 +169,15 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
             assertEquals("v1", store.load(1));
             assertEquals("v2", store.load(2));
             assertNull(store.load(3));
+            assertEquals(store.loadAll(Arrays.asList(3, 4, 5)).size(), 0);
 
             store.delete(1);
 
             assertNull(store.load(1));
+            assertEquals(store.loadAll(Arrays.asList(1)).size(), 0);
             assertEquals("v2", store.load(2));
             assertNull(store.load(3));
+            assertEquals(store.loadAll(Arrays.asList(3)).size(), 0);
         }
         finally {
             shutdownStore();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 07d26aa..dd773e1 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStor
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
@@ -243,6 +244,7 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);